osd: remove old pg removal infrastructure

Another queue bites the dust! \o/

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2017-12-11 09:10:59 -06:00
parent d77dd6352e
commit 285654c2bc
4 changed files with 27 additions and 492 deletions

View File

@ -2064,12 +2064,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
cct->_conf->osd_command_thread_timeout,
cct->_conf->osd_command_thread_suicide_timeout,
&command_tp),
remove_wq(
cct,
store,
cct->_conf->osd_remove_thread_timeout,
cct->_conf->osd_remove_thread_suicide_timeout,
&disk_tp),
service(this)
{
monc->set_messenger(client_messenger);
@ -3836,64 +3830,6 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
}
}
OSD::res_result OSD::_try_resurrect_pg(
OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state)
{
assert(resurrected);
assert(old_pg_state);
// find nearest ancestor
DeletingStateRef df;
spg_t cur(pgid);
while (true) {
df = service.deleting_pgs.lookup(cur);
if (df)
break;
if (!cur.ps())
break;
cur = cur.get_parent();
}
if (!df)
return RES_NONE; // good to go
df->old_pg_state->lock();
OSDMapRef create_map = df->old_pg_state->get_osdmap();
df->old_pg_state->unlock();
set<spg_t> children;
if (cur == pgid) {
if (df->try_stop_deletion()) {
dout(10) << __func__ << ": halted deletion on pg " << pgid << dendl;
*resurrected = cur;
*old_pg_state = df->old_pg_state;
service.deleting_pgs.remove(pgid); // PG is no longer being removed!
return RES_SELF;
} else {
// raced, ensure we don't see DeletingStateRef when we try to
// delete this pg
service.deleting_pgs.remove(pgid);
return RES_NONE;
}
} else if (cur.is_split(create_map->get_pg_num(cur.pool()),
curmap->get_pg_num(cur.pool()),
&children) &&
children.count(pgid)) {
if (df->try_stop_deletion()) {
dout(10) << __func__ << ": halted deletion on ancestor pg " << pgid
<< dendl;
*resurrected = cur;
*old_pg_state = df->old_pg_state;
service.deleting_pgs.remove(cur); // PG is no longer being removed!
return RES_PARENT;
} else {
/* this is not a problem, failing to cancel proves that all objects
* have been removed, so no hobject_t overlap is possible
*/
return RES_NONE;
}
}
return RES_NONE;
}
PG *OSD::_create_lock_pg(
OSDMapRef createmap,
spg_t pgid,
@ -4111,129 +4047,41 @@ int OSD::handle_pg_peering_evt(
if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
return -EAGAIN;
}
// do we need to resurrect a deleting pg?
spg_t resurrected;
PGRef old_pg_state;
res_result result = _try_resurrect_pg(
service.get_osdmap(),
pgid,
&resurrected,
&old_pg_state);
PG::RecoveryCtx rctx = create_context();
switch (result) {
case RES_NONE: {
const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool());
if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) &&
store->get_type() != "bluestore") {
clog->warn() << "pg " << pgid
<< " is at risk of silent data corruption: "
<< "the pool allows ec overwrites but is not stored in "
<< "bluestore, so deep scrubbing will not detect bitrot";
}
PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
PG::_init(*rctx.transaction, pgid, pp);
int role = osdmap->calc_pg_role(whoami, acting, acting.size());
if (!pp->is_replicated() && role != pgid.shard)
role = -1;
pg = _create_lock_pg(
get_map(epoch),
pgid, false, false,
role,
up, up_primary,
acting, acting_primary,
history, pi,
*rctx.transaction);
pg->handle_create(&rctx);
dispatch_context(rctx, pg, osdmap);
dout(10) << *pg << " is new" << dendl;
pg->queue_peering_event(evt);
wake_pg_waiters(pg);
pg->unlock();
return 0;
const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool());
if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) &&
store->get_type() != "bluestore") {
clog->warn() << "pg " << pgid
<< " is at risk of silent data corruption: "
<< "the pool allows ec overwrites but is not stored in "
<< "bluestore, so deep scrubbing will not detect bitrot";
}
case RES_SELF: {
old_pg_state->lock();
OSDMapRef old_osd_map = old_pg_state->get_osdmap();
int old_role = old_pg_state->get_role();
vector<int> old_up = old_pg_state->get_up();
int old_up_primary = old_pg_state->get_up_primary();
vector<int> old_acting = old_pg_state->get_acting();
int old_primary = old_pg_state->get_acting_primary();
pg_history_t old_history = old_pg_state->get_history();
PastIntervals old_past_intervals = old_pg_state->get_past_intervals();
old_pg_state->unlock();
pg = _create_lock_pg(
old_osd_map,
resurrected,
false,
true,
old_role,
old_up,
old_up_primary,
old_acting,
old_primary,
old_history,
old_past_intervals,
*rctx.transaction);
pg->handle_create(&rctx);
dispatch_context(rctx, pg, osdmap);
PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
PG::_init(*rctx.transaction, pgid, pp);
dout(10) << *pg << " is new (resurrected)" << dendl;
int role = osdmap->calc_pg_role(whoami, acting, acting.size());
if (!pp->is_replicated() && role != pgid.shard)
role = -1;
pg->queue_peering_event(evt);
wake_pg_waiters(pg);
pg->unlock();
return 0;
}
case RES_PARENT: {
assert(old_pg_state);
old_pg_state->lock();
OSDMapRef old_osd_map = old_pg_state->get_osdmap();
int old_role = old_pg_state->get_role();
vector<int> old_up = old_pg_state->get_up();
int old_up_primary = old_pg_state->get_up_primary();
vector<int> old_acting = old_pg_state->get_acting();
int old_primary = old_pg_state->get_acting_primary();
pg_history_t old_history = old_pg_state->get_history();
PastIntervals old_past_intervals = old_pg_state->get_past_intervals();
old_pg_state->unlock();
PG *parent = _create_lock_pg(
old_osd_map,
resurrected,
false,
true,
old_role,
old_up,
old_up_primary,
old_acting,
old_primary,
old_history,
old_past_intervals,
*rctx.transaction
);
parent->handle_create(&rctx);
dispatch_context(rctx, parent, osdmap);
pg = _create_lock_pg(
get_map(epoch),
pgid, false, false,
role,
up, up_primary,
acting, acting_primary,
history, pi,
*rctx.transaction);
pg->handle_create(&rctx);
dispatch_context(rctx, pg, osdmap);
dout(10) << *parent << " is new" << dendl;
dout(10) << *pg << " is new" << dendl;
assert(service.splitting(pgid));
peering_wait_for_split[pgid].push_back(evt);
//parent->queue_peering_event(evt);
parent->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
wake_pg_waiters(parent);
parent->unlock();
return 0;
}
default:
assert(0);
return 0;
}
pg->queue_peering_event(evt);
wake_pg_waiters(pg);
pg->unlock();
return 0;
} else {
// already had it. did the mapping change?
if (epoch < pg->get_same_interval_since()) {
@ -5081,7 +4929,6 @@ void OSD::tick_without_osd_lock()
logger->set(l_osd_cached_crc, buffer::get_cached_crc());
logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
logger->set(l_osd_missed_crc, buffer::get_missed_crc());
logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
// osd_lock is not being held, which means the OSD state
// might change when doing the monitor report
@ -5346,113 +5193,6 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
ss << "Internal error - command=" << command;
}
// =========================================
bool remove_dir(
PGRef pg,
ObjectStore *store,
DeletingStateRef dstate,
bool *finished,
ThreadPool::TPHandle &handle)
{
CephContext *cct = pg->get_cct();
vector<ghobject_t> olist;
int64_t num = 0;
ObjectStore::Transaction t;
ghobject_t next;
handle.reset_tp_timeout();
store->collection_list(
pg->coll,
next,
ghobject_t::get_max(),
store->get_ideal_list_max(),
&olist,
&next);
generic_dout(10) << __func__ << " " << olist << dendl;
// default cont to true, this is safe because caller(OSD::RemoveWQ::_process())
// will recheck the answer before it really goes on.
bool cont = true;
for (vector<ghobject_t>::iterator i = olist.begin();
i != olist.end();
++i) {
if (i->is_pgmeta())
continue;
pg->pg_remove_object(*i, &t);
if (++num >= cct->_conf->osd_target_transaction_size) {
C_SaferCond waiter;
store->queue_transaction(pg->osr.get(), std::move(t), &waiter);
cont = dstate->pause_clearing();
handle.suspend_tp_timeout();
waiter.wait();
handle.reset_tp_timeout();
if (cont)
cont = dstate->resume_clearing();
if (!cont)
return false;
t = ObjectStore::Transaction();
num = 0;
}
}
if (num) {
C_SaferCond waiter;
store->queue_transaction(pg->osr.get(), std::move(t), &waiter);
cont = dstate->pause_clearing();
handle.suspend_tp_timeout();
waiter.wait();
handle.reset_tp_timeout();
if (cont)
cont = dstate->resume_clearing();
}
// whether there are more objects to remove in the collection
*finished = next.is_max();
return cont;
}
void OSD::RemoveWQ::_process(
pair<PGRef, DeletingStateRef> item,
ThreadPool::TPHandle &handle)
{
FUNCTRACE(cct);
PGRef pg(item.first);
coll_t coll = coll_t(pg->pg_id);
pg->osr->flush();
bool finished = false;
if (!item.second->start_or_resume_clearing())
return;
bool cont = remove_dir(pg, store, item.second, &finished, handle);
if (!cont)
return;
if (!finished) {
if (item.second->pause_clearing())
queue_front(item);
return;
}
if (!item.second->start_deleting())
return;
ObjectStore::Transaction t;
PGLog::clear_info_log(pg->pg_id, &t);
if (cct->_conf->osd_inject_failure_on_pg_removal) {
generic_derr << "osd_inject_failure_on_pg_removal" << dendl;
_exit(1);
}
t.remove_collection(coll);
// We need the sequencer to stick around until the op is complete
store->queue_transaction(
pg->osr.get(),
std::move(t),
0, // onapplied
0, // oncommit
0, // onreadable sync
new ContainerContext<PGRef>(pg),
TrackedOpRef());
item.second->finish_deleting();
}
// =========================================
void OSD::ms_handle_connect(Connection *con)
@ -8061,7 +7801,6 @@ void OSD::consume_map()
logger->set(l_osd_pg_primary, num_pg_primary);
logger->set(l_osd_pg_replica, num_pg_replica);
logger->set(l_osd_pg_stray, num_pg_stray);
logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
}
void OSD::activate_map()
@ -8925,11 +8664,6 @@ void OSD::handle_pg_query(OpRequestRef op)
dout(10) << " pg " << pgid << " dne" << dendl;
pg_info_t empty(spg_t(pgid.pgid, it->second.to));
/* This is racy, but that should be ok: if we complete the deletion
* before the pg is recreated, we'll just start it off backfilling
* instead of just empty */
if (service.deleting_pgs.lookup(pgid))
empty.set_last_backfill(hobject_t());
if (it->second.type == pg_query_t::LOG ||
it->second.type == pg_query_t::FULLLOG) {
ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
@ -8996,41 +8730,6 @@ void OSD::handle_pg_remove(OpRequestRef op)
}
}
void OSD::_remove_pg(PG *pg)
{
ObjectStore::Transaction rmt ;
// on_removal, which calls remove_watchers_and_notifies, and the erasure from
// the pg_map must be done together without unlocking the pg lock,
// to avoid racing with watcher cleanup in ms_handle_reset
// and handle_notify_timeout
pg->on_removal(&rmt);
service.cancel_pending_splits_for_parent(pg->pg_id);
int tr = store->queue_transaction(
pg->osr.get(), std::move(rmt), NULL,
new ContainerContext<
SequencerRef>(pg->osr));
assert(tr == 0);
DeletingStateRef deleting = service.deleting_pgs.lookup_or_create(
pg->pg_id,
make_pair(
pg->pg_id,
PGRef(pg))
);
remove_wq.queue(make_pair(PGRef(pg), deleting));
service.pg_remove_epoch(pg->pg_id);
// dereference from op_wq
//op_shardedwq.clear_pg_pointer(pg->pg_id);
// remove from map
pg_map.erase(pg->pg_id);
pg->put("PGMap"); // since we've taken it out of map
}
// =========================================================
// RECOVERY

View File

@ -239,108 +239,6 @@ class CephContext;
typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
class MOSDOp;
class DeletingState {
Mutex lock;
Cond cond;
enum {
QUEUED,
CLEARING_DIR,
CLEARING_WAITING,
DELETING_DIR,
DELETED_DIR,
CANCELED,
} status;
bool stop_deleting;
public:
const spg_t pgid;
const PGRef old_pg_state;
explicit DeletingState(const pair<spg_t, PGRef> &in) :
lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
pgid(in.first), old_pg_state(in.second) {
}
/// transition status to CLEARING_WAITING
bool pause_clearing() {
Mutex::Locker l(lock);
assert(status == CLEARING_DIR);
if (stop_deleting) {
status = CANCELED;
cond.Signal();
return false;
}
status = CLEARING_WAITING;
return true;
} ///< @return false if we should cancel deletion
/// start or resume the clearing - transition the status to CLEARING_DIR
bool start_or_resume_clearing() {
Mutex::Locker l(lock);
assert(
status == QUEUED ||
status == DELETED_DIR ||
status == CLEARING_WAITING);
if (stop_deleting) {
status = CANCELED;
cond.Signal();
return false;
}
status = CLEARING_DIR;
return true;
} ///< @return false if we should cancel the deletion
/// transition status to CLEARING_DIR
bool resume_clearing() {
Mutex::Locker l(lock);
assert(status == CLEARING_WAITING);
if (stop_deleting) {
status = CANCELED;
cond.Signal();
return false;
}
status = CLEARING_DIR;
return true;
} ///< @return false if we should cancel deletion
/// transition status to deleting
bool start_deleting() {
Mutex::Locker l(lock);
assert(status == CLEARING_DIR);
if (stop_deleting) {
status = CANCELED;
cond.Signal();
return false;
}
status = DELETING_DIR;
return true;
} ///< @return false if we should cancel deletion
/// signal collection removal queued
void finish_deleting() {
Mutex::Locker l(lock);
assert(status == DELETING_DIR);
status = DELETED_DIR;
cond.Signal();
}
/// try to halt the deletion
bool try_stop_deletion() {
Mutex::Locker l(lock);
stop_deleting = true;
/**
* If we are in DELETING_DIR or CLEARING_DIR, there are in progress
* operations we have to wait for before continuing on. States
* CLEARING_WAITING and QUEUED indicate that the remover will check
* stop_deleting before queueing any further operations. CANCELED
* indicates that the remover has already halted. DELETED_DIR
* indicates that the deletion has been fully queued.
*/
while (status == DELETING_DIR || status == CLEARING_DIR)
cond.Wait(lock);
return status != DELETED_DIR;
} ///< @return true if we don't need to recreate the collection
};
typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
class OSD;
class OSDService {
@ -349,7 +247,6 @@ public:
CephContext *cct;
SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
const int whoami;
ObjectStore *&store;
LogClient &log_client;
@ -1942,13 +1839,6 @@ public:
protected:
PG *_open_lock_pg(OSDMapRef createmap,
spg_t pg, bool no_lockdep_check=false);
enum res_result {
RES_PARENT, // resurrected a parent
RES_SELF, // resurrected self
RES_NONE // nothing relevant deleting
};
res_result _try_resurrect_pg(
OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
PG *_create_lock_pg(
OSDMapRef createmap,
@ -2113,7 +2003,6 @@ protected:
void handle_force_recovery(Message *m);
void handle_pg_remove(OpRequestRef op);
void _remove_pg(PG *pg);
// -- commands --
struct Command {
@ -2183,48 +2072,6 @@ protected:
bool scrub_load_below_threshold();
bool scrub_time_permit(utime_t now);
// -- removing --
struct RemoveWQ :
public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
CephContext* cct;
ObjectStore *&store;
list<pair<PGRef, DeletingStateRef> > remove_queue;
RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
ThreadPool *tp)
: ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
"OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
bool _empty() override {
return remove_queue.empty();
}
void _enqueue(pair<PGRef, DeletingStateRef> item) override {
remove_queue.push_back(item);
}
void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
remove_queue.push_front(item);
}
bool _dequeue(pair<PGRef, DeletingStateRef> item) {
ceph_abort();
}
pair<PGRef, DeletingStateRef> _dequeue() override {
assert(!remove_queue.empty());
pair<PGRef, DeletingStateRef> item = remove_queue.front();
remove_queue.pop_front();
return item;
}
void _process(pair<PGRef, DeletingStateRef>,
ThreadPool::TPHandle &) override;
void _clear() override {
remove_queue.clear();
}
int get_remove_queue_len() {
lock();
int r = remove_queue.size();
unlock();
return r;
}
} remove_wq;
// -- status reporting --
MPGStats *collect_pg_stats();
std::vector<OSDHealthMetric> get_health_metrics();

View File

@ -8752,13 +8752,3 @@ void PG::with_heartbeat_peers(std::function<void(int)> f)
}
heartbeat_peer_lock.Unlock();
}
void PG::pg_remove_object(const ghobject_t& oid, ObjectStore::Transaction *t)
{
OSDriver::OSTransaction _t(osdriver.get_transaction(t));
int r = snap_mapper.remove_oid(oid.hobj, &_t);
if (r != 0 && r != -ENOENT) {
ceph_abort();
}
t->remove(coll, oid);
}

View File

@ -480,7 +480,6 @@ public:
virtual void on_removal(ObjectStore::Transaction *t) = 0;
void _delete_some();
void pg_remove_object(const ghobject_t& oid, ObjectStore::Transaction *t);
// reference counting
#ifdef PG_DEBUG_REFS