1
0
mirror of https://github.com/ceph/ceph synced 2025-03-21 09:48:37 +00:00

PG,ReplicatedPG: Generalize missing_loc for ECBackend

Prior to EC pools, unfound => missing.  Now, unfound (unreadable,
really) is dependent on the PGBackend requirements for reconstituting
an object.  This also means recovering an object missing on a replica
but not the primary requires tracking the missing_loc set.

Thus, rather than maintaining missing_loc only for objects missing
on the primary, the MissingLoc structure will track all missing
objects actingbackfill-wide until each object is recovered.

For simplicity, since we don't really know what objects need recovery
until activation (and since we can't do anything with that information
prior to activation anyway), we defer populating the missing_loc
information until activation.

We need peers to rollback divergent log entries before we attempt to
read the relevant objects.  The simplest way to accomplish this seems to
be the simply choose to always activate peers if search_for_missing
turns up missing objects.

Due to EC pools, missing is necessary, but not sufficient for readability.
Thus, we instead check is_unreadable for cases where we need to read the object
and reserve is_missing for cases where we need the object context.

wait_for_missing_object becomes waiting_for_unreadable_object in order to avoid
having another layer of waiting_for_* maps.  These ops may be requeued
either when the primary is recovered or when the object is no longer degraded,
depending on when the object becomes readable.

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2014-01-29 13:38:04 -08:00
parent 8b33d60fc4
commit 84e2f39c55
7 changed files with 392 additions and 197 deletions

View File

@ -4306,11 +4306,11 @@ void OSD::do_command(Connection *con, tid_t tid, vector<string>& cmd, bufferlist
pg->pg_log.get_missing().missing.begin();
for (; mi != mend; ++mi) {
fout << mi->first << " -> " << mi->second << std::endl;
map<hobject_t, set<pg_shard_t> >::const_iterator mli =
pg->missing_loc.find(mi->first);
if (mli == pg->missing_loc.end())
if (!pg->missing_loc.needs_recovery(mi->first))
continue;
const set<pg_shard_t> &mls(mli->second);
if (pg->missing_loc.is_unfound(mi->first))
fout << " unfound ";
const set<pg_shard_t> &mls(pg->missing_loc.get_locations(mi->first));
if (mls.empty())
continue;
fout << "missing_loc: " << mls << std::endl;

View File

@ -43,9 +43,10 @@
#define dout_subsys ceph_subsys_osd
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
static ostream& _prefix(std::ostream *_dout, const PG *pg)
template <class T>
static ostream& _prefix(std::ostream *_dout, T *t)
{
return *_dout << pg->gen_prefix();
return *_dout << t->gen_prefix();
}
void PG::get(const string &tag)
@ -161,6 +162,7 @@ PG::PG(OSDService *o, OSDMapRef curmap,
info(p),
info_struct_v(0),
coll(p), pg_log(cct), log_oid(loid), biginfo_oid(ioid),
missing_loc(this),
recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this),
recovery_ops_active(0),
role(0),
@ -245,7 +247,6 @@ void PG::proc_master_log(
dout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl;
might_have_unfound.insert(from);
search_for_missing(oinfo, &omissing, from);
peer_missing[from].swap(omissing);
}
@ -263,7 +264,6 @@ void PG::proc_replica_log(
dout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl;
might_have_unfound.insert(from);
search_for_missing(oinfo, &omissing, from);
for (map<hobject_t, pg_missing_t::item>::iterator i = omissing.missing.begin();
i != omissing.missing.end();
++i) {
@ -292,7 +292,8 @@ bool PG::proc_replica_info(pg_shard_t from, const pg_info_t &oinfo)
reg_next_scrub();
// stray?
if (!is_acting(from)) {
if ((!is_active() && !is_acting(from)) ||
(is_active() && !is_actingbackfill(from))) {
dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
stray_set.insert(from);
if (is_clean()) {
@ -374,19 +375,57 @@ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead)
* Instead, we probably want to just iterate over our unfound set.
*/
bool PG::search_for_missing(
const pg_info_t &oinfo, const pg_missing_t *omissing,
pg_shard_t fromosd)
const pg_info_t &oinfo, const pg_missing_t &omissing,
pg_shard_t from,
RecoveryCtx *ctx)
{
bool stats_updated = false;
bool found_missing = false;
unsigned num_unfound_before = missing_loc.num_unfound();
bool found_missing = missing_loc.add_source_info(
from, oinfo, omissing);
if (found_missing && num_unfound_before != missing_loc.num_unfound())
publish_stats_to_osd();
if (found_missing &&
(get_osdmap()->get_features(NULL) & CEPH_FEATURE_OSD_ERASURE_CODES)) {
pg_info_t tinfo(oinfo);
tinfo.pgid.shard = pg_whoami.shard;
(*(ctx->info_map))[from.osd].push_back(
make_pair(
pg_notify_t(
from.shard, pg_whoami.shard,
get_osdmap()->get_epoch(),
get_osdmap()->get_epoch(),
tinfo),
past_intervals));
}
return found_missing;
}
// take note that we've probed this peer, for
// all_unfound_are_queried_or_lost()'s benefit.
peer_missing[fromosd];
bool PG::MissingLoc::readable_with_acting(
const hobject_t &hoid,
const set<pg_shard_t> &acting) const {
if (!needs_recovery(hoid)) return true;
if (!missing_loc.count(hoid)) return false;
const set<pg_shard_t> &locs = missing_loc.find(hoid)->second;
dout(10) << __func__ << ": locs:" << locs << dendl;
set<pg_shard_t> have_acting;
for (set<pg_shard_t>::const_iterator i = locs.begin();
i != locs.end();
++i) {
if (acting.count(*i))
have_acting.insert(*i);
}
return (*is_readable)(have_acting);
}
bool PG::MissingLoc::add_source_info(
pg_shard_t fromosd,
const pg_info_t &oinfo,
const pg_missing_t &omissing)
{
bool found_missing = false;;
// found items?
for (map<hobject_t,pg_missing_t::item>::const_iterator p = pg_log.get_missing().missing.begin();
p != pg_log.get_missing().missing.end();
for (map<hobject_t,pg_missing_t::item>::const_iterator p = needs_recovery_map.begin();
p != needs_recovery_map.end();
++p) {
const hobject_t &soid(p->first);
eversion_t need = p->second.need;
@ -407,53 +446,29 @@ bool PG::search_for_missing(
continue;
}
if (oinfo.last_complete < need) {
if (!omissing) {
// We know that the peer lacks some objects at the revision we need.
// Without the peer's missing set, we don't know whether it has this
// particular object or not.
dout(10) << __func__ << " " << soid << " " << need
<< " might also be missing on osd." << fromosd << dendl;
continue;
}
if (omissing->is_missing(soid)) {
if (omissing.is_missing(soid)) {
dout(10) << "search_for_missing " << soid << " " << need
<< " also missing on osd." << fromosd << dendl;
continue;
}
}
dout(10) << "search_for_missing " << soid << " " << need
<< " is on osd." << fromosd << dendl;
map<hobject_t, set<pg_shard_t> >::iterator ml = missing_loc.find(soid);
if (ml == missing_loc.end()) {
map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(soid);
if (wmo != waiting_for_missing_object.end()) {
requeue_ops(wmo->second);
}
stats_updated = true;
missing_loc[soid].insert(fromosd);
missing_loc_sources.insert(fromosd);
}
else {
ml->second.insert(fromosd);
missing_loc_sources.insert(fromosd);
}
missing_loc[soid].insert(fromosd);
missing_loc_sources.insert(fromosd);
found_missing = true;
}
if (stats_updated) {
publish_stats_to_osd();
}
dout(20) << "search_for_missing missing " << pg_log.get_missing().missing << dendl;
dout(20) << "needs_recovery_map missing " << needs_recovery_map << dendl;
return found_missing;
}
void PG::discover_all_missing(map<int, map<spg_t,pg_query_t> > &query_map)
{
const pg_missing_t &missing = pg_log.get_missing();
assert(missing.have_missing());
assert(have_unfound());
dout(10) << __func__ << " "
<< missing.num_missing() << " missing, "
@ -801,7 +816,6 @@ void PG::clear_primary_state()
finish_sync_event = 0; // so that _finish_recvoery doesn't go off in another thread
missing_loc.clear();
missing_loc_sources.clear();
pg_log.reset_recovery_pointers();
@ -1372,7 +1386,8 @@ void PG::activate(ObjectStore::Transaction& t,
map<int,
vector<
pair<pg_notify_t,
pg_interval_map_t> > > *activator_map)
pg_interval_map_t> > > *activator_map,
RecoveryCtx *ctx)
{
assert(!is_active());
assert(scrubber.callbacks.empty());
@ -1401,20 +1416,11 @@ void PG::activate(ObjectStore::Transaction& t,
send_notify = false;
info.last_epoch_started = query_epoch;
if (is_acting(pg_whoami))
info.last_epoch_started = query_epoch;
const pg_missing_t &missing = pg_log.get_missing();
if (is_primary()) {
// If necessary, create might_have_unfound to help us find our unfound objects.
// NOTE: It's important that we build might_have_unfound before trimming the
// past intervals.
might_have_unfound.clear();
if (missing.have_missing()) {
build_might_have_unfound();
}
}
if (is_primary()) {
last_update_ondisk = info.last_update;
min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
@ -1465,6 +1471,7 @@ void PG::activate(ObjectStore::Transaction& t,
// if primary..
if (is_primary()) {
assert(ctx);
// start up replicas
assert(actingbackfill.size() > 0);
@ -1566,6 +1573,51 @@ void PG::activate(ObjectStore::Transaction& t,
}
}
// Set up missing_loc
for (set<pg_shard_t>::iterator i = actingbackfill.begin();
i != actingbackfill.end();
++i) {
if (*i == get_primary()) {
missing_loc.add_active_missing(pg_log.get_missing());
} else {
assert(peer_missing.count(*i));
missing_loc.add_active_missing(peer_missing[*i]);
}
}
// If necessary, create might_have_unfound to help us find our unfound objects.
// NOTE: It's important that we build might_have_unfound before trimming the
// past intervals.
might_have_unfound.clear();
if (needs_recovery()) {
missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing());
for (set<pg_shard_t>::iterator i = actingbackfill.begin();
i != actingbackfill.end();
++i) {
if (*i == pg_whoami) continue;
dout(10) << __func__ << ": adding " << *i << " as a source" << dendl;
assert(peer_missing.count(*i));
assert(peer_info.count(*i));
missing_loc.add_source_info(
*i,
peer_info[*i],
peer_missing[*i]);
}
for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
i != peer_missing.end();
++i) {
if (is_actingbackfill(i->first))
continue;
assert(peer_info.count(i->first));
search_for_missing(
peer_info[i->first],
i->second,
i->first,
ctx);
}
build_might_have_unfound();
}
// degraded?
if (get_osdmap()->get_pg_size(info.pgid.pgid) > acting.size())
state_set(PG_STATE_DEGRADED);
@ -1938,7 +1990,7 @@ void PG::split_ops(PG *child, unsigned split_bits) {
unsigned match = child->info.pgid.ps();
assert(waiting_for_all_missing.empty());
assert(waiting_for_cache_not_full.empty());
assert(waiting_for_missing_object.empty());
assert(waiting_for_unreadable_object.empty());
assert(waiting_for_degraded_object.empty());
assert(waiting_for_ack.empty());
assert(waiting_for_ondisk.empty());
@ -3392,11 +3444,10 @@ void PG::repair_object(
peer_missing[bad_peer].add(soid, oi.version, eversion_t());
} else {
// We should only be scrubbing if the PG is clean.
assert(waiting_for_missing_object.empty());
assert(waiting_for_unreadable_object.empty());
pg_log.missing_add(soid, oi.version, eversion_t());
missing_loc[soid].insert(ok_peer);
missing_loc_sources.insert(ok_peer);
missing_loc.add_location(soid, ok_peer);
pg_log.set_last_requested(0);
}
@ -6037,7 +6088,8 @@ PG::RecoveryState::Active::Active(my_context ctx)
pg->get_osdmap()->get_epoch(),
*context< RecoveryMachine >().get_on_safe_context_list(),
*context< RecoveryMachine >().get_query_map(),
context< RecoveryMachine >().get_info_map());
context< RecoveryMachine >().get_info_map(),
context< RecoveryMachine >().get_recovery_ctx());
assert(pg->is_active());
dout(10) << "Activate Finished" << dendl;
}
@ -6099,7 +6151,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
if (pg->cct->_conf->osd_check_for_log_corruption)
pg->check_log_for_corruption(pg->osd->store);
int unfound = pg->pg_log.get_missing().num_missing() - pg->missing_loc.size();
int unfound = pg->missing_loc.num_unfound();
if (unfound > 0 &&
pg->all_unfound_are_queried_or_lost(pg->get_osdmap())) {
if (pg->cct->_conf->osd_auto_mark_unfound_lost) {
@ -6179,8 +6231,14 @@ boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt
dout(10) << "searching osd." << logevt.from
<< " log for unfound items" << dendl;
PG *pg = context< RecoveryMachine >().pg;
bool got_missing = pg->search_for_missing(logevt.msg->info,
&logevt.msg->missing, logevt.from);
pg->proc_replica_log(
*context<RecoveryMachine>().get_cur_transaction(),
logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
bool got_missing = pg->search_for_missing(
pg->peer_info[logevt.from],
pg->peer_missing[logevt.from],
logevt.from,
context< RecoveryMachine >().get_recovery_ctx());
if (got_missing)
pg->osd->queue_for_recovery(pg);
return discard_event();
@ -6292,7 +6350,7 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(
pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
actevt.query_epoch,
*context< RecoveryMachine >().get_on_safe_context_list(),
query_map, NULL);
query_map, NULL, NULL);
dout(10) << "Activate Finished" << dendl;
return discard_event();
}
@ -6927,7 +6985,6 @@ PG::RecoveryState::GetMissing::GetMissing(my_context ctx)
// can infer the rest!
dout(10) << " osd." << *i << " has no missing, identical log" << dendl;
pg->peer_missing[*i];
pg->search_for_missing(pi, &pg->peer_missing[*i], *i);
continue;
}
@ -7072,17 +7129,10 @@ boost::statechart::result PG::RecoveryState::WaitUpThru::react(const ActMap& am)
boost::statechart::result PG::RecoveryState::WaitUpThru::react(const MLogRec& logevt)
{
dout(10) << "searching osd." << logevt.from
<< " log for unfound items" << dendl;
dout(10) << "Noting missing from osd." << logevt.from << dendl;
PG *pg = context< RecoveryMachine >().pg;
bool got_missing = pg->search_for_missing(logevt.msg->info,
&logevt.msg->missing, logevt.from);
// hmm.. should we?
(void)got_missing;
//if (got_missing)
//pg->osd->queue_for_recovery(pg);
pg->peer_missing[logevt.from].swap(logevt.msg->missing);
pg->peer_info[logevt.from] = logevt.msg->info;
return discard_event();
}

View File

@ -300,8 +300,114 @@ public:
}
hobject_t log_oid;
hobject_t biginfo_oid;
map<hobject_t, set<pg_shard_t> > missing_loc;
set<pg_shard_t> missing_loc_sources; // superset of missing_loc locations
class MissingLoc {
map<hobject_t, pg_missing_t::item> needs_recovery_map;
map<hobject_t, set<pg_shard_t> > missing_loc;
set<pg_shard_t> missing_loc_sources;
PG *pg;
boost::scoped_ptr<PGBackend::IsReadablePredicate> is_readable;
boost::scoped_ptr<PGBackend::IsRecoverablePredicate> is_recoverable;
set<pg_shard_t> empty_set;
public:
MissingLoc(PG *pg)
: pg(pg) {}
void set_backend_predicates(
PGBackend::IsReadablePredicate *_is_readable,
PGBackend::IsRecoverablePredicate *_is_recoverable) {
is_readable.reset(_is_readable);
is_recoverable.reset(_is_recoverable);
}
string gen_prefix() const { return pg->gen_prefix(); }
bool needs_recovery(
const hobject_t &hoid,
eversion_t *v = 0) const {
map<hobject_t, pg_missing_t::item>::const_iterator i =
needs_recovery_map.find(hoid);
if (i == needs_recovery_map.end())
return false;
if (v)
*v = i->second.need;
return true;
}
bool is_unfound(const hobject_t &hoid) const {
return needs_recovery(hoid) && (
!missing_loc.count(hoid) ||
!(*is_recoverable)(missing_loc.find(hoid)->second));
}
bool readable_with_acting(
const hobject_t &hoid,
const set<pg_shard_t> &acting) const;
uint64_t num_unfound() const {
uint64_t ret = 0;
for (map<hobject_t, pg_missing_t::item>::const_iterator i =
needs_recovery_map.begin();
i != needs_recovery_map.end();
++i) {
if (is_unfound(i->first))
++ret;
}
return ret;
}
void clear() {
needs_recovery_map.clear();
missing_loc.clear();
missing_loc_sources.clear();
}
void add_location(const hobject_t &hoid, pg_shard_t location) {
missing_loc[hoid].insert(location);
}
void remove_location(const hobject_t &hoid, pg_shard_t location) {
missing_loc[hoid].erase(location);
}
void add_active_missing(const pg_missing_t &missing) {
for (map<hobject_t, pg_missing_t::item>::const_iterator i =
missing.missing.begin();
i != missing.missing.end();
++i) {
map<hobject_t, pg_missing_t::item>::const_iterator j =
needs_recovery_map.find(i->first);
if (j == needs_recovery_map.end()) {
needs_recovery_map.insert(*i);
} else {
assert(i->second.need == j->second.need);
}
}
}
void revise_need(const hobject_t &hoid, eversion_t need) {
assert(needs_recovery(hoid));
needs_recovery_map[hoid].need = need;
}
/// Adds info about a possible recovery source
bool add_source_info(
pg_shard_t source, ///< [in] source
const pg_info_t &oinfo, ///< [in] info
const pg_missing_t &omissing ///< [in] (optional) missing
); ///< @return whether a new object location was discovered
/// Uses osdmap to update structures for now down sources
void check_recovery_sources(const OSDMapRef osdmap);
/// Call when hoid is no longer missing in acting set
void recovered(const hobject_t &hoid) {
needs_recovery_map.erase(hoid);
missing_loc.erase(hoid);
}
const set<pg_shard_t> &get_locations(const hobject_t &hoid) const {
return missing_loc.count(hoid) ?
missing_loc.find(hoid)->second : empty_set;
}
const map<hobject_t, set<pg_shard_t> > &get_missing_locs() const {
return missing_loc;
}
const map<hobject_t, pg_missing_t::item> &get_needs_recovery() const {
return needs_recovery_map;
}
} missing_loc;
interval_set<snapid_t> snap_collections; // obsolete
map<epoch_t,pg_interval_t> past_intervals;
@ -540,7 +646,7 @@ protected:
list<OpRequestRef> waiting_for_active;
list<OpRequestRef> waiting_for_cache_not_full;
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
map<hobject_t, list<OpRequestRef> > waiting_for_unreadable_object,
waiting_for_degraded_object,
waiting_for_blocked_object;
// Callbacks should assume pg (and nothing else) is locked
@ -730,8 +836,9 @@ public:
pg_log_t &olog, pg_shard_t from);
void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);
bool search_for_missing(
const pg_info_t &oinfo, const pg_missing_t *omissing,
pg_shard_t fromosd);
const pg_info_t &oinfo, const pg_missing_t &omissing,
pg_shard_t fromosd,
RecoveryCtx*);
void check_for_lost_objects();
void forget_lost_objects();
@ -779,17 +886,18 @@ public:
list<Context*>& tfin,
map<int, map<spg_t,pg_query_t> >& query_map,
map<int,
vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map=0);
vector<pair<pg_notify_t, pg_interval_map_t> > > *activator_map,
RecoveryCtx *ctx);
void _activate_committed(epoch_t e);
void all_activated_and_committed();
void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info);
bool have_unfound() const {
return pg_log.get_missing().num_missing() > missing_loc.size();
return missing_loc.num_unfound();
}
int get_num_unfound() const {
return pg_log.get_missing().num_missing() - missing_loc.size();
return missing_loc.num_unfound();
}
virtual void check_local() = 0;
@ -1273,6 +1381,8 @@ public:
return &(state->rctx->on_applied->contexts);
}
RecoveryCtx *get_recovery_ctx() { return state->rctx; }
void send_notify(pg_shard_t to,
const pg_notify_t &info, const pg_interval_map_t &pi) {
assert(state->rctx->notify_list);

View File

@ -315,6 +315,16 @@
};
virtual IsRecoverablePredicate *get_is_recoverable_predicate() = 0;
class IsReadablePredicate {
public:
/**
* have encodes the shards available
*/
virtual bool operator()(const set<pg_shard_t> &have) const = 0;
virtual ~IsReadablePredicate() {}
};
virtual IsReadablePredicate *get_is_readable_predicate() = 0;
void temp_colls(list<coll_t> *out) {
if (temp_created)
out->push_back(temp_coll);

View File

@ -80,6 +80,18 @@ public:
return new RPCRecPred;
}
class RPCReadPred : public IsReadablePredicate {
pg_shard_t whoami;
public:
RPCReadPred(pg_shard_t whoami) : whoami(whoami) {}
bool operator()(const set<pg_shard_t> &have) const {
return have.count(whoami);
}
};
IsReadablePredicate *get_is_readable_predicate() {
return new RPCReadPred(get_parent()->whoami_shard());
}
virtual void dump_recovery_info(Formatter *f) const {
{
f->open_array_section("pull_from_peer");

View File

@ -255,14 +255,17 @@ void ReplicatedPG::on_local_recover(
t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
publish_stats_to_osd();
if (waiting_for_missing_object.count(hoid)) {
dout(20) << " kicking waiters on " << hoid << dendl;
requeue_ops(waiting_for_missing_object[hoid]);
waiting_for_missing_object.erase(hoid);
if (pg_log.get_missing().missing.size() == 0) {
requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
}
assert(missing_loc.needs_recovery(hoid));
missing_loc.add_location(hoid, pg_whoami);
if (!is_unreadable_object(hoid) &&
waiting_for_unreadable_object.count(hoid)) {
dout(20) << " kicking unreadable waiters on " << hoid << dendl;
requeue_ops(waiting_for_unreadable_object[hoid]);
waiting_for_unreadable_object.erase(hoid);
}
if (pg_log.get_missing().missing.size() == 0) {
requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
}
} else {
t->register_on_applied(
@ -285,6 +288,7 @@ void ReplicatedPG::on_local_recover(
void ReplicatedPG::on_global_recover(
const hobject_t &soid)
{
missing_loc.recovered(soid);
publish_stats_to_osd();
dout(10) << "pushed " << soid << " to all replicas" << dendl;
map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
@ -297,7 +301,13 @@ void ReplicatedPG::on_global_recover(
}
recovering.erase(i);
finish_recovery_op(soid);
if (waiting_for_unreadable_object.count(soid)) {
dout(20) << " kicking unreadable waiters on " << soid << dendl;
requeue_ops(waiting_for_unreadable_object[soid]);
waiting_for_unreadable_object.erase(soid);
}
if (waiting_for_degraded_object.count(soid)) {
dout(20) << " kicking degraded waiters on " << soid << dendl;
requeue_ops(waiting_for_degraded_object[soid]);
waiting_for_degraded_object.erase(soid);
}
@ -380,36 +390,32 @@ bool ReplicatedPG::same_for_rep_modify_since(epoch_t e)
// ====================
// missing objects
bool ReplicatedPG::is_missing_object(const hobject_t& soid)
bool ReplicatedPG::is_missing_object(const hobject_t& soid) const
{
return pg_log.get_missing().missing.count(soid);
}
void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef op)
void ReplicatedPG::wait_for_unreadable_object(
const hobject_t& soid, OpRequestRef op)
{
assert(is_missing_object(soid));
assert(is_unreadable_object(soid));
const pg_missing_t &missing = pg_log.get_missing();
// we don't have it (yet).
map<hobject_t, pg_missing_t::item>::const_iterator g = missing.missing.find(soid);
assert(g != missing.missing.end());
const eversion_t &v(g->second.need);
eversion_t v;
bool needs_recovery = missing_loc.needs_recovery(soid, &v);
assert(needs_recovery);
map<hobject_t, ObjectContextRef>::const_iterator p = recovering.find(soid);
if (p != recovering.end()) {
dout(7) << "missing " << soid << " v " << v << ", already recovering." << dendl;
}
else if (missing_loc.find(soid) == missing_loc.end()) {
} else if (missing_loc.is_unfound(soid)) {
dout(7) << "missing " << soid << " v " << v << ", is unfound." << dendl;
}
else {
} else {
dout(7) << "missing " << soid << " v " << v << ", recovering." << dendl;
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
}
waiting_for_missing_object[soid].push_back(op);
waiting_for_unreadable_object[soid].push_back(op);
op->mark_delayed("waiting for missing object");
}
@ -641,7 +647,7 @@ int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss,
return -EROFS;
}
int unfound = missing.num_missing() - missing_loc.size();
int unfound = missing_loc.num_unfound();
if (!unfound) {
ss << "pg has no unfound objects";
return 0; // make command idempotent
@ -694,13 +700,13 @@ int ReplicatedPG::do_command(cmdmap_t cmdmap, ostream& ss,
p->second.dump(f.get()); // have, need keys
{
f->open_array_section("locations");
map<hobject_t,set<pg_shard_t> >::iterator q =
missing_loc.find(p->first);
if (q != missing_loc.end())
for (set<pg_shard_t>::iterator r = q->second.begin();
r != q->second.end();
if (missing_loc.needs_recovery(p->first)) {
for (set<pg_shard_t>::iterator r =
missing_loc.get_locations(p->first).begin();
r != missing_loc.get_locations(p->first).end();
++r)
f->dump_stream("shard") << *r;
}
f->close_section();
}
f->close_section();
@ -1019,6 +1025,9 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
temp_seq(0),
snap_trimmer_machine(this)
{
missing_loc.set_backend_predicates(
pgbackend->get_is_readable_predicate(),
pgbackend->get_is_recoverable_predicate());
snap_trimmer_machine.initiate();
}
@ -1179,8 +1188,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
}
// missing object?
if (is_missing_object(head)) {
wait_for_missing_object(head, op);
if (is_unreadable_object(head)) {
wait_for_unreadable_object(head, op);
return;
}
@ -1195,7 +1204,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
CEPH_SNAPDIR, m->get_pg().ps(), info.pgid.pool(),
m->get_object_locator().nspace);
if (is_missing_object(snapdir)) {
wait_for_missing_object(snapdir, op);
wait_for_unreadable_object(snapdir, op);
return;
}
@ -1231,7 +1240,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
!(m->get_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
// missing the specific snap we need; requeue and wait.
assert(!can_create); // only happens on a read
wait_for_missing_object(missing_oid, op);
wait_for_unreadable_object(missing_oid, op);
return;
}
}
@ -1309,11 +1318,11 @@ void ReplicatedPG::do_op(OpRequestRef op)
int r;
if (src_oid.is_head() && is_missing_object(src_oid)) {
wait_for_missing_object(src_oid, op);
wait_for_unreadable_object(src_oid, op);
} else if ((r = find_object_context(
src_oid, &sobc, false, &wait_oid)) == -EAGAIN) {
// missing the specific snap we need; requeue and wait.
wait_for_missing_object(wait_oid, op);
wait_for_unreadable_object(wait_oid, op);
} else if (r) {
if (!maybe_handle_cache(op, write_ordered, sobc, r, wait_oid, true))
osd->reply_op_error(op, r);
@ -1373,7 +1382,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
int r = find_object_context(clone_oid, &sobc, false, &wait_oid);
if (r == -EAGAIN) {
// missing the specific snap we need; requeue and wait.
wait_for_missing_object(wait_oid, op);
wait_for_unreadable_object(wait_oid, op);
} else if (r) {
if (!maybe_handle_cache(op, write_ordered, sobc, r, wait_oid, true))
osd->reply_op_error(op, r);
@ -4512,7 +4521,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
assert(is_missing_object(missing_oid));
dout(20) << "_rollback_to attempted to roll back to a missing object "
<< missing_oid << " (requested snapid: ) " << snapid << dendl;
wait_for_missing_object(missing_oid, ctx->op);
wait_for_unreadable_object(missing_oid, ctx->op);
return ret;
}
if (maybe_handle_cache(ctx->op, true, rollback_to, ret, missing_oid, true)) {
@ -7456,8 +7465,7 @@ int ReplicatedPG::recover_missing(
int priority,
PGBackend::RecoveryHandle *h)
{
map<hobject_t, set<pg_shard_t> >::iterator q = missing_loc.find(soid);
if (q == missing_loc.end()) {
if (missing_loc.is_unfound(soid)) {
dout(7) << "pull " << soid
<< " v " << v
<< " but it is unfound" << dendl;
@ -8368,10 +8376,6 @@ void ReplicatedPG::_applied_recovered_object_replica()
void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
{
dout(10) << "got missing " << oid << " v " << v << dendl;
if (pg_log.get_missing().is_missing(oid, v)) {
if (is_primary())
missing_loc.erase(oid);
}
pg_log.recover_got(oid, v, info);
if (pg_log.get_log().complete_to != pg_log.get_log().log.end()) {
dout(10) << "last_complete now " << info.last_complete
@ -8498,18 +8502,10 @@ void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid)
{
assert(recovering.count(soid));
recovering.erase(soid);
map<hobject_t,set<pg_shard_t> >::iterator p = missing_loc.find(soid);
if (p != missing_loc.end()) {
dout(0) << "_failed_push " << soid << " from shard " << from
<< ", reps on " << p->second << dendl;
p->second.erase(from); // forget about this (bad) peer replica
if (p->second.empty())
missing_loc.erase(p);
} else {
dout(0) << "_failed_push " << soid << " from shard " << from
<< " but not in missing_loc ???" << dendl;
}
missing_loc.remove_location(soid, from);
dout(0) << "_failed_push " << soid << " from shard " << from
<< ", reps on " << missing_loc.get_locations(soid)
<< " unfound? " << missing_loc.is_unfound(soid) << dendl;
finish_recovery_op(soid); // close out this attempt,
}
@ -8575,8 +8571,8 @@ ObjectContextRef ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
// Wake anyone waiting for this object. Now that it's been marked as lost,
// we will just return an error code.
map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(oid);
if (wmo != waiting_for_missing_object.end()) {
waiting_for_unreadable_object.find(oid);
if (wmo != waiting_for_unreadable_object.end()) {
requeue_ops(wmo->second);
}
@ -8629,7 +8625,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
map<hobject_t, pg_missing_t::item>::const_iterator mend = missing.missing.end();
while (m != mend) {
const hobject_t &oid(m->first);
if (missing_loc.find(oid) != missing_loc.end()) {
if (!missing_loc.is_unfound(oid)) {
// We only care about unfound objects
++m;
continue;
@ -8661,6 +8657,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
// we are now missing the new version; recovery code will sort it out.
++m;
pg_log.revise_need(oid, info.last_update);
missing_loc.revise_need(oid, info.last_update);
break;
}
/** fall-thru **/
@ -8679,6 +8676,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
assert(0 == "not implemented.. tho i'm not sure how useful it really would be.");
}
pg_log.missing_rm(m++);
missing_loc.recovered(oid);
}
break;
@ -8885,9 +8883,9 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
// requeue object waiters
if (is_primary()) {
requeue_object_waiters(waiting_for_missing_object);
requeue_object_waiters(waiting_for_unreadable_object);
} else {
waiting_for_missing_object.clear();
waiting_for_unreadable_object.clear();
}
for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
p != waiting_for_degraded_object.end();
@ -8948,7 +8946,6 @@ void ReplicatedPG::on_pool_change()
void ReplicatedPG::_clear_recovery_state()
{
missing_loc.clear();
missing_loc_sources.clear();
#ifdef DEBUG_RECOVERY_OIDS
recovering_oids.clear();
#endif
@ -8981,44 +8978,9 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
* check that any peers we are planning to (or currently) pulling
* objects from are dealt with.
*/
set<pg_shard_t> now_down;
for (set<pg_shard_t>::iterator p = missing_loc_sources.begin();
p != missing_loc_sources.end();
) {
if (osdmap->is_up(p->osd)) {
++p;
continue;
}
dout(10) << "check_recovery_sources source osd." << *p << " now down" << dendl;
now_down.insert(*p);
missing_loc_sources.erase(p++);
}
missing_loc.check_recovery_sources(osdmap);
pgbackend->check_recovery_sources(osdmap);
if (now_down.empty()) {
dout(10) << "check_recovery_sources no source osds (" << missing_loc_sources << ") went down" << dendl;
} else {
dout(10) << "check_recovery_sources sources osds " << now_down << " now down, remaining sources are "
<< missing_loc_sources << dendl;
// filter missing_loc
map<hobject_t, set<pg_shard_t> >::iterator p = missing_loc.begin();
while (p != missing_loc.end()) {
set<pg_shard_t>::iterator q = p->second.begin();
while (q != p->second.end())
if (now_down.count(*q)) {
p->second.erase(q++);
} else {
assert(missing_loc_sources.count(*q));
++q;
}
if (p->second.empty())
missing_loc.erase(p++);
else
++p;
}
}
for (set<pg_shard_t>::iterator i = peer_log_requested.begin();
i != peer_log_requested.end();
) {
@ -9041,6 +9003,45 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
}
}
}
void PG::MissingLoc::check_recovery_sources(const OSDMapRef osdmap)
{
set<pg_shard_t> now_down;
for (set<pg_shard_t>::iterator p = missing_loc_sources.begin();
p != missing_loc_sources.end();
) {
if (osdmap->is_up(p->osd)) {
++p;
continue;
}
dout(10) << "check_recovery_sources source osd." << *p << " now down" << dendl;
now_down.insert(*p);
missing_loc_sources.erase(p++);
}
if (now_down.empty()) {
dout(10) << "check_recovery_sources no source osds (" << missing_loc_sources << ") went down" << dendl;
} else {
dout(10) << "check_recovery_sources sources osds " << now_down << " now down, remaining sources are "
<< missing_loc_sources << dendl;
// filter missing_loc
map<hobject_t, set<pg_shard_t> >::iterator p = missing_loc.begin();
while (p != missing_loc.end()) {
set<pg_shard_t>::iterator q = p->second.begin();
while (q != p->second.end())
if (now_down.count(*q)) {
p->second.erase(q++);
} else {
++q;
}
if (p->second.empty())
missing_loc.erase(p++);
else
++p;
}
}
}
bool ReplicatedPG::start_recovery_ops(
@ -9124,6 +9125,12 @@ bool ReplicatedPG::start_recovery_ops(
assert(recovering.empty());
assert(recovery_ops_active == 0);
dout(10) << __func__ << " needs_recovery: "
<< missing_loc.get_needs_recovery()
<< dendl;
dout(10) << __func__ << " missing_loc: "
<< missing_loc.get_missing_locs()
<< dendl;
int unfound = get_num_unfound();
if (unfound) {
dout(10) << " still have " << unfound << " unfound" << dendl;
@ -9221,7 +9228,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
eversion_t need = item.need;
bool unfound = (missing_loc.find(soid) == missing_loc.end());
bool unfound = missing_loc.is_unfound(soid);
dout(10) << "recover_primary "
<< soid << " " << item.need
@ -9262,6 +9269,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
t->setattr(coll, soid, OI_ATTR, b2);
recover_got(soid, latest->version);
missing_loc.add_location(soid, pg_whoami);
++active_pushes;
@ -9289,16 +9297,16 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
eversion_t alternate_need = latest->reverting_to;
dout(10) << " need to pull prior_version " << alternate_need << " for revert " << item << dendl;
set<pg_shard_t>& loc = missing_loc[soid];
for (map<pg_shard_t, pg_missing_t>::iterator p = peer_missing.begin();
p != peer_missing.end();
++p)
if (p->second.is_missing(soid, need) &&
p->second.missing[soid].have == alternate_need) {
missing_loc_sources.insert(p->first);
loc.insert(p->first);
missing_loc.add_location(soid, p->first);
}
dout(10) << " will pull " << alternate_need << " or " << need << " from one of " << loc << dendl;
dout(10) << " will pull " << alternate_need << " or " << need
<< " from one of " << missing_loc.get_locations(soid)
<< dendl;
unfound = false;
}
}
@ -9351,6 +9359,7 @@ int ReplicatedPG::prep_object_replica_pushes(
ObjectContextRef obc = get_object_context(soid, false);
if (!obc) {
pg_log.missing_add(soid, v, eversion_t());
missing_loc.remove_location(soid, pg_whoami);
bool uhoh = true;
assert(actingbackfill.size() > 0);
for (set<pg_shard_t>::iterator i = actingbackfill.begin();
@ -9359,8 +9368,7 @@ int ReplicatedPG::prep_object_replica_pushes(
if (*i == get_primary()) continue;
pg_shard_t peer = *i;
if (!peer_missing[peer].is_missing(soid, v)) {
missing_loc[soid].insert(peer);
missing_loc_sources.insert(peer);
missing_loc.add_location(soid, peer);
dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
<< ", there should be a copy on shard " << peer << dendl;
uhoh = false;
@ -9370,7 +9378,8 @@ int ReplicatedPG::prep_object_replica_pushes(
osd->clog.error() << info.pgid << " missing primary copy of " << soid << ", unfound\n";
else
osd->clog.error() << info.pgid << " missing primary copy of " << soid
<< ", will try copies on " << missing_loc[soid] << "\n";
<< ", will try copies on " << missing_loc.get_locations(soid)
<< "\n";
return 0;
}
@ -9468,7 +9477,7 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
}
if (pg_log.get_missing().is_missing(soid)) {
if (missing_loc.find(soid) == missing_loc.end())
if (missing_loc.is_unfound(soid))
dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
else
dout(10) << __func__ << ": " << soid << " still missing on primary" << dendl;

View File

@ -304,7 +304,7 @@ public:
std::string gen_dbg_prefix() const { return gen_prefix(); }
const map<hobject_t, set<pg_shard_t> > &get_missing_loc_shards() const {
return missing_loc;
return missing_loc.get_missing_locs();
}
const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
return peer_missing;
@ -1272,8 +1272,12 @@ public:
bool same_for_modify_since(epoch_t e);
bool same_for_rep_modify_since(epoch_t e);
bool is_missing_object(const hobject_t& oid);
void wait_for_missing_object(const hobject_t& oid, OpRequestRef op);
bool is_missing_object(const hobject_t& oid) const;
bool is_unreadable_object(const hobject_t &oid) const {
return is_missing_object(oid) ||
!missing_loc.readable_with_acting(oid, actingset);
}
void wait_for_unreadable_object(const hobject_t& oid, OpRequestRef op);
void wait_for_all_missing(OpRequestRef op);
bool is_degraded_object(const hobject_t& oid);