mirror of
https://github.com/ceph/ceph
synced 2025-02-23 19:17:37 +00:00
osd: use peering events for forced recovery
The mgr code is updated to send spg_t's instead of pg_t's (and is slightly refactored/cleaned). The PG events are added to the Primary state, unless we're also in the Clean substate, in which case they are ignored. Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
parent
bd8d198c07
commit
fe5e361467
@ -33,18 +33,18 @@ static const int OFR_CANCEL = 4;
|
||||
|
||||
struct MOSDForceRecovery : public Message {
|
||||
|
||||
static const int HEAD_VERSION = 1;
|
||||
static const int COMPAT_VERSION = 1;
|
||||
static const int HEAD_VERSION = 2;
|
||||
static const int COMPAT_VERSION = 2;
|
||||
|
||||
uuid_d fsid;
|
||||
vector<pg_t> forced_pgs;
|
||||
vector<spg_t> forced_pgs;
|
||||
uint8_t options = 0;
|
||||
|
||||
MOSDForceRecovery() : Message(MSG_OSD_FORCE_RECOVERY, HEAD_VERSION, COMPAT_VERSION) {}
|
||||
MOSDForceRecovery(const uuid_d& f, char opts) :
|
||||
Message(MSG_OSD_FORCE_RECOVERY, HEAD_VERSION, COMPAT_VERSION),
|
||||
fsid(f), options(opts) {}
|
||||
MOSDForceRecovery(const uuid_d& f, vector<pg_t>& pgs, char opts) :
|
||||
MOSDForceRecovery(const uuid_d& f, vector<spg_t>& pgs, char opts) :
|
||||
Message(MSG_OSD_FORCE_RECOVERY, HEAD_VERSION, COMPAT_VERSION),
|
||||
fsid(f), forced_pgs(pgs), options(opts) {}
|
||||
private:
|
||||
@ -69,12 +69,38 @@ public:
|
||||
|
||||
void encode_payload(uint64_t features) {
|
||||
using ceph::encode;
|
||||
if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
|
||||
header.version = 1;
|
||||
header.compat_version = 1;
|
||||
vector<pg_t> pgs;
|
||||
for (auto pgid : forced_pgs) {
|
||||
pgs.push_back(pgid.pgid);
|
||||
}
|
||||
encode(fsid, payload);
|
||||
encode(pgs, payload);
|
||||
encode(options, payload);
|
||||
return;
|
||||
}
|
||||
header.version = HEAD_VERSION;
|
||||
header.compat_version = COMPAT_VERSION;
|
||||
encode(fsid, payload);
|
||||
encode(forced_pgs, payload);
|
||||
encode(options, payload);
|
||||
}
|
||||
void decode_payload() {
|
||||
bufferlist::iterator p = payload.begin();
|
||||
if (header.version == 1) {
|
||||
vector<pg_t> pgs;
|
||||
decode(fsid, p);
|
||||
decode(pgs, p);
|
||||
decode(options, p);
|
||||
for (auto pg : pgs) {
|
||||
// note: this only works with replicated pools. if a pre-mimic mon
|
||||
// tries to force a mimic+ osd on an ec pool it will not work.
|
||||
forced_pgs.push_back(spg_t(pg));
|
||||
}
|
||||
return;
|
||||
}
|
||||
decode(fsid, p);
|
||||
decode(forced_pgs, p);
|
||||
decode(options, p);
|
||||
|
@ -1213,7 +1213,6 @@ bool DaemonServer::handle_command(MCommand *m)
|
||||
prefix == "pg cancel-force-backfill") {
|
||||
string forceop = prefix.substr(3, string::npos);
|
||||
list<pg_t> parsed_pgs;
|
||||
map<int, list<pg_t> > osdpgs;
|
||||
|
||||
// figure out actual op just once
|
||||
int actual_op = 0;
|
||||
@ -1303,20 +1302,6 @@ bool DaemonServer::handle_command(MCommand *m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// group pgs to process by osd
|
||||
for (auto& pgid : parsed_pgs) {
|
||||
auto workit = pg_map.pg_stat.find(pgid);
|
||||
if (workit != pg_map.pg_stat.end()) {
|
||||
pg_stat_t workpg = workit->second;
|
||||
set<int32_t> osds(workpg.up.begin(), workpg.up.end());
|
||||
osds.insert(workpg.acting.begin(), workpg.acting.end());
|
||||
for (auto i : osds) {
|
||||
osdpgs[i].push_back(pgid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@ -1328,24 +1313,35 @@ bool DaemonServer::handle_command(MCommand *m)
|
||||
r = 0;
|
||||
}
|
||||
|
||||
// optimize the command -> messages conversion, use only one message per distinct OSD
|
||||
// optimize the command -> messages conversion, use only one
|
||||
// message per distinct OSD
|
||||
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
|
||||
for (auto& i : osdpgs) {
|
||||
if (osdmap.is_up(i.first)) {
|
||||
vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
|
||||
auto p = osd_cons.find(i.first);
|
||||
if (p == osd_cons.end()) {
|
||||
ss << "osd." << i.first << " is not currently connected";
|
||||
r = -EAGAIN;
|
||||
continue;
|
||||
// group pgs to process by osd
|
||||
map<int, vector<spg_t>> osdpgs;
|
||||
for (auto& pgid : parsed_pgs) {
|
||||
int primary;
|
||||
spg_t spg;
|
||||
if (osdmap.get_primary_shard(pgid, &primary, &spg)) {
|
||||
osdpgs[primary].push_back(spg);
|
||||
}
|
||||
for (auto& con : p->second) {
|
||||
con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
|
||||
}
|
||||
ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
|
||||
}
|
||||
}
|
||||
});
|
||||
for (auto& i : osdpgs) {
|
||||
if (osdmap.is_up(i.first)) {
|
||||
auto p = osd_cons.find(i.first);
|
||||
if (p == osd_cons.end()) {
|
||||
ss << "osd." << i.first << " is not currently connected";
|
||||
r = -EAGAIN;
|
||||
continue;
|
||||
}
|
||||
for (auto& con : p->second) {
|
||||
con->send_message(
|
||||
new MOSDForceRecovery(monc->get_fsid(), i.second, actual_op));
|
||||
}
|
||||
ss << "instructing pg(s) " << i.second << " on osd." << i.first
|
||||
<< " to " << forceop << "; ";
|
||||
}
|
||||
}
|
||||
});
|
||||
ss << std::endl;
|
||||
cmdctx->reply(r, ss);
|
||||
return true;
|
||||
|
@ -6542,6 +6542,9 @@ void OSD::ms_fast_dispatch(Message *m)
|
||||
case MSG_COMMAND:
|
||||
handle_command(static_cast<MCommand*>(m));
|
||||
return;
|
||||
case MSG_OSD_FORCE_RECOVERY:
|
||||
handle_fast_force_recovery(static_cast<MOSDForceRecovery*>(m));
|
||||
return;
|
||||
|
||||
case MSG_OSD_PG_CREATE2:
|
||||
return handle_fast_pg_create(static_cast<MOSDPGCreate2*>(m));
|
||||
@ -6775,10 +6778,6 @@ void OSD::_dispatch(Message *m)
|
||||
handle_scrub(static_cast<MOSDScrub*>(m));
|
||||
break;
|
||||
|
||||
case MSG_OSD_FORCE_RECOVERY:
|
||||
handle_force_recovery(m);
|
||||
break;
|
||||
|
||||
// -- need OSDMap --
|
||||
|
||||
case MSG_OSD_PG_CREATE:
|
||||
@ -8596,32 +8595,46 @@ void OSD::handle_fast_pg_remove(MOSDPGRemove *m)
|
||||
}
|
||||
}
|
||||
|
||||
void OSD::handle_force_recovery(Message *m)
|
||||
void OSD::handle_fast_force_recovery(MOSDForceRecovery *m)
|
||||
{
|
||||
MOSDForceRecovery *msg = static_cast<MOSDForceRecovery*>(m);
|
||||
assert(msg->get_type() == MSG_OSD_FORCE_RECOVERY);
|
||||
|
||||
vector<PGRef> local_pgs;
|
||||
local_pgs.reserve(msg->forced_pgs.size());
|
||||
|
||||
{
|
||||
RWLock::RLocker l(pg_map_lock);
|
||||
for (auto& i : msg->forced_pgs) {
|
||||
spg_t locpg;
|
||||
if (osdmap->get_primary_shard(i, &locpg)) {
|
||||
auto pg_map_entry = pg_map.find(locpg);
|
||||
if (pg_map_entry != pg_map.end()) {
|
||||
local_pgs.push_back(pg_map_entry->second);
|
||||
}
|
||||
dout(10) << __func__ << " " << *m << dendl;
|
||||
epoch_t epoch = get_osdmap()->get_epoch();
|
||||
for (auto pgid : m->forced_pgs) {
|
||||
if (m->options & OFR_BACKFILL) {
|
||||
if (m->options & OFR_CANCEL) {
|
||||
enqueue_peering_evt(
|
||||
pgid,
|
||||
PGPeeringEventRef(
|
||||
std::make_shared<PGPeeringEvent>(
|
||||
epoch, epoch,
|
||||
PG::UnsetForceBackfill())));
|
||||
} else {
|
||||
enqueue_peering_evt(
|
||||
pgid,
|
||||
PGPeeringEventRef(
|
||||
std::make_shared<PGPeeringEvent>(
|
||||
epoch, epoch,
|
||||
PG::SetForceBackfill())));
|
||||
}
|
||||
} else if (m->options & OFR_RECOVERY) {
|
||||
if (m->options & OFR_CANCEL) {
|
||||
enqueue_peering_evt(
|
||||
pgid,
|
||||
PGPeeringEventRef(
|
||||
std::make_shared<PGPeeringEvent>(
|
||||
epoch, epoch,
|
||||
PG::UnsetForceRecovery())));
|
||||
} else {
|
||||
enqueue_peering_evt(
|
||||
pgid,
|
||||
PGPeeringEventRef(
|
||||
std::make_shared<PGPeeringEvent>(
|
||||
epoch, epoch,
|
||||
PG::SetForceRecovery())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (local_pgs.size()) {
|
||||
service.adjust_pg_priorities(local_pgs, msg->options);
|
||||
}
|
||||
|
||||
msg->put();
|
||||
m->put();
|
||||
}
|
||||
|
||||
void OSD::handle_pg_query_nopg(const MQuery& q)
|
||||
@ -8728,37 +8741,6 @@ bool OSDService::_recover_now(uint64_t *available_pushes)
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void OSDService::adjust_pg_priorities(const vector<PGRef>& pgs, int newflags)
|
||||
{
|
||||
if (!pgs.size() || !(newflags & (OFR_BACKFILL | OFR_RECOVERY))) {
|
||||
return;
|
||||
}
|
||||
set<spg_t> did;
|
||||
if (newflags & OFR_BACKFILL) {
|
||||
for (auto& pg : pgs) {
|
||||
if (pg->set_force_backfill(!(newflags & OFR_CANCEL))) {
|
||||
did.insert(pg->pg_id);
|
||||
}
|
||||
}
|
||||
} else if (newflags & OFR_RECOVERY) {
|
||||
for (auto& pg : pgs) {
|
||||
if (pg->set_force_recovery(!(newflags & OFR_CANCEL))) {
|
||||
did.insert(pg->pg_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (did.empty()) {
|
||||
dout(10) << __func__ << " " << ((newflags & OFR_CANCEL) ? "cleared" : "set")
|
||||
<< " force_" << ((newflags & OFR_BACKFILL) ? "backfill" : "recovery")
|
||||
<< " on no pgs" << dendl;
|
||||
} else {
|
||||
dout(10) << __func__ << " " << ((newflags & OFR_CANCEL) ? "cleared" : "set")
|
||||
<< " force_" << ((newflags & OFR_BACKFILL) ? "backfill" : "recovery")
|
||||
<< " on " << did << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
void OSD::do_recovery(
|
||||
PG *pg, epoch_t queued, uint64_t reserved_pushes,
|
||||
ThreadPool::TPHandle &handle)
|
||||
|
@ -253,6 +253,7 @@ class MOSDPGQuery;
|
||||
class MOSDPGNotify;
|
||||
class MOSDPGInfo;
|
||||
class MOSDPGRemove;
|
||||
class MOSDForceRecovery;
|
||||
|
||||
class OSD;
|
||||
|
||||
@ -870,8 +871,6 @@ public:
|
||||
_queue_for_recovery(make_pair(queued, pg), reserved_pushes);
|
||||
}
|
||||
|
||||
void adjust_pg_priorities(const vector<PGRef>& pgs, int newflags);
|
||||
|
||||
// osd map cache (past osd maps)
|
||||
Mutex map_cache_lock;
|
||||
SharedLRU<epoch_t, const OSDMap> map_cache;
|
||||
@ -2042,7 +2041,7 @@ protected:
|
||||
|
||||
PGRef handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info);
|
||||
|
||||
void handle_force_recovery(Message *m);
|
||||
void handle_fast_force_recovery(MOSDForceRecovery *m);
|
||||
|
||||
// -- commands --
|
||||
struct Command {
|
||||
@ -2123,6 +2122,7 @@ private:
|
||||
case CEPH_MSG_PING:
|
||||
case CEPH_MSG_OSD_OP:
|
||||
case CEPH_MSG_OSD_BACKOFF:
|
||||
case MSG_OSD_FORCE_RECOVERY:
|
||||
case MSG_MON_COMMAND:
|
||||
case MSG_COMMAND:
|
||||
case MSG_OSD_PG_CREATE2:
|
||||
|
@ -2284,52 +2284,44 @@ void PG::mark_clean()
|
||||
bool PG::set_force_recovery(bool b)
|
||||
{
|
||||
bool did = false;
|
||||
lock();
|
||||
if (!deleting) {
|
||||
if (b) {
|
||||
if (!(state & PG_STATE_FORCED_RECOVERY) &&
|
||||
(state & (PG_STATE_DEGRADED | // XXX: Will this check be messed up?
|
||||
PG_STATE_RECOVERY_WAIT |
|
||||
PG_STATE_RECOVERING))) {
|
||||
dout(20) << __func__ << " set" << dendl;
|
||||
state_set(PG_STATE_FORCED_RECOVERY);
|
||||
publish_stats_to_osd();
|
||||
did = true;
|
||||
}
|
||||
} else if (state & PG_STATE_FORCED_RECOVERY) {
|
||||
dout(20) << __func__ << " clear" << dendl;
|
||||
state_clear(PG_STATE_FORCED_RECOVERY);
|
||||
if (b) {
|
||||
if (!(state & PG_STATE_FORCED_RECOVERY) &&
|
||||
(state & (PG_STATE_DEGRADED |
|
||||
PG_STATE_RECOVERY_WAIT |
|
||||
PG_STATE_RECOVERING))) {
|
||||
dout(20) << __func__ << " set" << dendl;
|
||||
state_set(PG_STATE_FORCED_RECOVERY);
|
||||
publish_stats_to_osd();
|
||||
did = true;
|
||||
}
|
||||
} else if (state & PG_STATE_FORCED_RECOVERY) {
|
||||
dout(20) << __func__ << " clear" << dendl;
|
||||
state_clear(PG_STATE_FORCED_RECOVERY);
|
||||
publish_stats_to_osd();
|
||||
did = true;
|
||||
}
|
||||
unlock();
|
||||
return did;
|
||||
}
|
||||
|
||||
bool PG::set_force_backfill(bool b)
|
||||
{
|
||||
bool did = false;
|
||||
lock();
|
||||
if (!deleting) {
|
||||
if (b) {
|
||||
if (!(state & PG_STATE_FORCED_RECOVERY) &&
|
||||
(state & (PG_STATE_DEGRADED | // XXX: Will this check be messed up?
|
||||
PG_STATE_BACKFILL_WAIT |
|
||||
PG_STATE_BACKFILLING))) {
|
||||
dout(10) << __func__ << " set" << dendl;
|
||||
state_set(PG_STATE_FORCED_RECOVERY);
|
||||
publish_stats_to_osd();
|
||||
did = true;
|
||||
}
|
||||
} else if (state & PG_STATE_FORCED_RECOVERY) {
|
||||
dout(10) << __func__ << " clear" << dendl;
|
||||
state_clear(PG_STATE_FORCED_RECOVERY);
|
||||
if (b) {
|
||||
if (!(state & PG_STATE_FORCED_RECOVERY) &&
|
||||
(state & (PG_STATE_DEGRADED |
|
||||
PG_STATE_BACKFILL_WAIT |
|
||||
PG_STATE_BACKFILLING))) {
|
||||
dout(10) << __func__ << " set" << dendl;
|
||||
state_set(PG_STATE_FORCED_RECOVERY);
|
||||
publish_stats_to_osd();
|
||||
did = true;
|
||||
}
|
||||
} else if (state & PG_STATE_FORCED_RECOVERY) {
|
||||
dout(10) << __func__ << " clear" << dendl;
|
||||
state_clear(PG_STATE_FORCED_RECOVERY);
|
||||
publish_stats_to_osd();
|
||||
did = true;
|
||||
}
|
||||
unlock();
|
||||
return did;
|
||||
}
|
||||
|
||||
@ -6836,6 +6828,38 @@ boost::statechart::result PG::RecoveryState::Primary::react(const ActMap&)
|
||||
return discard_event();
|
||||
}
|
||||
|
||||
boost::statechart::result PG::RecoveryState::Primary::react(
|
||||
const SetForceRecovery&)
|
||||
{
|
||||
PG *pg = context< RecoveryMachine >().pg;
|
||||
pg->set_force_recovery(true);
|
||||
return discard_event();
|
||||
}
|
||||
|
||||
boost::statechart::result PG::RecoveryState::Primary::react(
|
||||
const UnsetForceRecovery&)
|
||||
{
|
||||
PG *pg = context< RecoveryMachine >().pg;
|
||||
pg->set_force_recovery(false);
|
||||
return discard_event();
|
||||
}
|
||||
|
||||
boost::statechart::result PG::RecoveryState::Primary::react(
|
||||
const SetForceBackfill&)
|
||||
{
|
||||
PG *pg = context< RecoveryMachine >().pg;
|
||||
pg->set_force_backfill(true);
|
||||
return discard_event();
|
||||
}
|
||||
|
||||
boost::statechart::result PG::RecoveryState::Primary::react(
|
||||
const UnsetForceBackfill&)
|
||||
{
|
||||
PG *pg = context< RecoveryMachine >().pg;
|
||||
pg->set_force_backfill(false);
|
||||
return discard_event();
|
||||
}
|
||||
|
||||
void PG::RecoveryState::Primary::exit()
|
||||
{
|
||||
context< RecoveryMachine >().log_exit(state_name, enter_time);
|
||||
@ -7734,7 +7758,7 @@ PG::RecoveryState::Clean::Clean(my_context ctx)
|
||||
if (pg->is_active()) {
|
||||
pg->mark_clean();
|
||||
}
|
||||
|
||||
pg->state_clear(PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL);
|
||||
pg->share_pg_info();
|
||||
pg->publish_stats_to_osd();
|
||||
pg->requeue_ops(pg->waiting_for_clean_to_primary_repair);
|
||||
|
28
src/osd/PG.h
28
src/osd/PG.h
@ -1889,6 +1889,11 @@ protected:
|
||||
TrivialEvent(DeleteReserved)
|
||||
TrivialEvent(DeleteInterrupted)
|
||||
|
||||
TrivialEvent(SetForceRecovery)
|
||||
TrivialEvent(UnsetForceRecovery)
|
||||
TrivialEvent(SetForceBackfill)
|
||||
TrivialEvent(UnsetForceBackfill)
|
||||
|
||||
/* Encapsulates PG recovery process */
|
||||
class RecoveryState {
|
||||
void start_handle(RecoveryCtx *new_ctx);
|
||||
@ -2060,7 +2065,11 @@ protected:
|
||||
boost::statechart::custom_reaction< AdvMap >,
|
||||
boost::statechart::custom_reaction< NullEvt >,
|
||||
boost::statechart::custom_reaction< IntervalFlush >,
|
||||
boost::statechart::transition< boost::statechart::event_base, Crashed >
|
||||
boost::statechart::transition< boost::statechart::event_base, Crashed >,
|
||||
boost::statechart::custom_reaction<SetForceRecovery>,
|
||||
boost::statechart::custom_reaction<UnsetForceRecovery>,
|
||||
boost::statechart::custom_reaction<SetForceBackfill>,
|
||||
boost::statechart::custom_reaction<UnsetForceBackfill>
|
||||
> reactions;
|
||||
boost::statechart::result react(const QueryState& q);
|
||||
boost::statechart::result react(const AdvMap&);
|
||||
@ -2095,10 +2104,18 @@ protected:
|
||||
typedef boost::mpl::list <
|
||||
boost::statechart::custom_reaction< ActMap >,
|
||||
boost::statechart::custom_reaction< MNotifyRec >,
|
||||
boost::statechart::transition< NeedActingChange, WaitActingChange >
|
||||
boost::statechart::transition< NeedActingChange, WaitActingChange >,
|
||||
boost::statechart::custom_reaction<SetForceRecovery>,
|
||||
boost::statechart::custom_reaction<UnsetForceRecovery>,
|
||||
boost::statechart::custom_reaction<SetForceBackfill>,
|
||||
boost::statechart::custom_reaction<UnsetForceBackfill>
|
||||
> reactions;
|
||||
boost::statechart::result react(const ActMap&);
|
||||
boost::statechart::result react(const MNotifyRec&);
|
||||
boost::statechart::result react(const SetForceRecovery&);
|
||||
boost::statechart::result react(const UnsetForceRecovery&);
|
||||
boost::statechart::result react(const SetForceBackfill&);
|
||||
boost::statechart::result react(const UnsetForceBackfill&);
|
||||
};
|
||||
|
||||
struct WaitActingChange : boost::statechart::state< WaitActingChange, Primary>,
|
||||
@ -2202,10 +2219,15 @@ protected:
|
||||
|
||||
struct Clean : boost::statechart::state< Clean, Active >, NamedState {
|
||||
typedef boost::mpl::list<
|
||||
boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >
|
||||
boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >,
|
||||
boost::statechart::custom_reaction<SetForceRecovery>,
|
||||
boost::statechart::custom_reaction<SetForceBackfill>
|
||||
> reactions;
|
||||
explicit Clean(my_context ctx);
|
||||
void exit();
|
||||
boost::statechart::result react(const boost::statechart::event_base&) {
|
||||
return discard_event();
|
||||
}
|
||||
};
|
||||
|
||||
struct Recovered : boost::statechart::state< Recovered, Active >, NamedState {
|
||||
|
Loading…
Reference in New Issue
Block a user