osd: restructure consume_map in terms of shards

- new split primming machinery
- new primed split cleanup on pg removal
- cover the pg creation path

The old split tracking is now totally unused; will be removed in the next
patch.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2018-02-08 16:23:04 -06:00
parent 2a9c8d80ce
commit 295dfe0372
4 changed files with 344 additions and 194 deletions

View File

@ -512,6 +512,32 @@ void OSDService::complete_split(spg_t pgid)
in_progress_splits.erase(pgid);
}
void OSDService::identify_split_children(
OSDMapRef old_map,
OSDMapRef new_map,
spg_t pgid,
set<spg_t> *new_children)
{
if (!old_map->have_pg_pool(pgid.pool())) {
return;
}
int old_pgnum = old_map->get_pg_num(pgid.pool());
int new_pgnum = get_possibly_deleted_pool_pg_num(
new_map, pgid.pool());
if (pgid.ps() < static_cast<unsigned>(old_pgnum)) {
set<spg_t> children;
if (pgid.is_split(old_pgnum, new_pgnum, &children)) {
dout(20) << __func__ << " " << pgid << " children " << children << dendl;
new_children->insert(children.begin(), children.end());
}
} else if (pgid.ps() >= static_cast<unsigned>(new_pgnum)) {
dout(20) << __func__ << " " << pgid << " is post-split, skipping" << dendl;
}
}
void OSDService::need_heartbeat_peer_update()
{
osd->need_heartbeat_peer_update();
@ -1757,11 +1783,14 @@ void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
e));
}
void OSDService::finish_pg_delete(PG *pg)
void OSDService::finish_pg_delete(PG *pg, unsigned old_pg_num)
{
osd->op_shardedwq.clear_pg_pointer(pg);
pg_remove_epoch(pg->get_pgid());
cancel_pending_splits_for_parent(pg->get_pgid());
osd->unregister_pg(pg);
for (auto shard : osd->shards) {
shard->unprime_split_children(pg->pg_id, old_pg_num);
}
}
void OSDService::_queue_for_recovery(
@ -2074,9 +2103,13 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
char order_lock[128] = {0};
snprintf(order_lock, sizeof(order_lock), "OSDShard.%d::sdata_op_ordering_lock", i);
OSDShard *one_shard = new OSDShard(
i,
cct,
this,
lock_name, order_lock,
cct->_conf->osd_op_pq_max_tokens_per_priority,
cct->_conf->osd_op_pq_min_cost, cct, op_queue);
cct->_conf->osd_op_pq_min_cost,
op_queue);
shards.push_back(one_shard);
}
}
@ -2654,8 +2687,6 @@ int OSD::init()
clear_temp_objects();
// initialize osdmap references in sharded wq
#warning fixme initialization
//op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
for (auto& shard : shards) {
shard->osdmap = osdmap;
}
@ -3830,7 +3861,7 @@ void OSD::_get_pgs(vector<PGRef> *v, bool clear_too)
{
v->clear();
for (auto& s : shards) {
Mutex::Locker l(s->sdata_lock);
Mutex::Locker l(s->sdata_op_ordering_lock);
for (auto& j : s->pg_slots) {
if (j.second.pg &&
!j.second.pg->is_deleted()) {
@ -3848,7 +3879,7 @@ void OSD::_get_pgids(vector<spg_t> *v)
{
v->clear();
for (auto& s : shards) {
Mutex::Locker l(s->sdata_lock);
Mutex::Locker l(s->sdata_op_ordering_lock);
for (auto& j : s->pg_slots) {
if (j.second.pg &&
!j.second.pg->is_deleted()) {
@ -3858,22 +3889,41 @@ void OSD::_get_pgids(vector<spg_t> *v)
}
}
void OSD::_register_pg(PGRef pg)
void OSD::register_pg(PGRef pg)
{
spg_t pgid = pg->get_pgid();
uint32_t shard_index = pgid.hash_to_shard(num_shards);
auto sdata = shards[shard_index];
Mutex::Locker l(sdata->sdata_lock);
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto& slot = sdata->pg_slots[pgid];
assert(!slot.pg);
dout(20) << __func__ << " " << pgid << " " << pg << dendl;
slot.pg = pg;
++num_pgs;
}
void OSD::unregister_pg(PG *pg)
{
spg_t pgid = pg->get_pgid();
uint32_t shard_index = pgid.hash_to_shard(num_shards);
auto sdata = shards[shard_index];
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pg->pg_id);
if (p != sdata->pg_slots.end() &&
p->second.pg) {
dout(20) << __func__ << " " << pg->pg_id << " cleared" << dendl;
p->second.pg.reset();
--num_pgs;
} else {
dout(20) << __func__ << " " << pg->pg_id << " not found" << dendl;
}
}
PGRef OSD::_lookup_pg(spg_t pgid)
{
uint32_t shard_index = pgid.hash_to_shard(num_shards);
auto sdata = shards[shard_index];
Mutex::Locker l(sdata->sdata_lock);
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pgid);
if (p == sdata->pg_slots.end()) {
return nullptr;
@ -3970,31 +4020,28 @@ void OSD::load_pgs()
if (pg->dne()) {
dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
pg->ch = nullptr;
service.pg_remove_epoch(pg->pg_id);
pg->unlock();
{
// Delete pg
RWLock::WLocker l(pg_map_lock);
auto p = pg_map.find(pg->get_pgid());
assert(p != pg_map.end() && p->second == pg);
dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
pg_map.erase(p);
pg->put("PGMap");
}
unregister_pg(pg.get());
recursive_remove_collection(cct, store, pgid, *it);
continue;
}
set<spg_t> new_children;
service.init_splits_between(pg->pg_id, pg->get_osdmap(), osdmap, &new_children);
op_shardedwq.prime_splits(new_children);
service.identify_split_children(pg->get_osdmap(), osdmap, pg->pg_id,
&new_children);
if (!new_children.empty()) {
for (auto shard : shards) {
shard->prime_splits(osdmap, &new_children);
}
assert(new_children.empty());
}
pg->reg_next_scrub();
dout(10) << __func__ << " loaded " << *pg << dendl;
pg->unlock();
_register_pg(pg);
register_pg(pg);
++num;
}
dout(0) << __func__ << " opened " << num << " pgs" << dendl;
@ -4036,18 +4083,6 @@ PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info)
PGRef pg = _open_pg(createmap, pgid);
// We need to avoid racing with consume_map(). This should get
// redone as a structured waterfall of incoming osdmaps from osd ->
// shard, and something here that gets the to-be-split pg slots
// primed, even when they land on other shards. (I think it will be
// iterative to handle the case where it races with newer incoming
// maps.) For now, just cross our fingers. FIXME
{
set<spg_t> new_children;
service.init_splits_between(pg->pg_id, createmap, osdmap, &new_children);
op_shardedwq.prime_splits(new_children);
}
pg->lock(true);
// we are holding the shard lock
@ -7823,15 +7858,16 @@ void OSD::_finish_splits(set<PGRef>& pgs)
epoch_t e = pg->get_osdmap_epoch();
pg->unlock();
service.complete_split(pg->get_pgid());
service.pg_add_epoch(pg->pg_id, e);
pg->lock();
pg->handle_initialize(&rctx);
pg->queue_null(e, e);
dispatch_context_transaction(rctx, pg);
op_shardedwq.wake_pg_split_waiters(pg->get_pgid());
pg->unlock();
unsigned shard_index = pg->pg_id.hash_to_shard(num_shards);
shards[shard_index]->register_and_wake_split_child(pg);
}
dispatch_context(rctx, 0, service.get_osdmap());
@ -7873,7 +7909,6 @@ void OSD::advance_pg(
lastmap->get_pg_num(pg->pg_id.pool()),
nextmap->get_pg_num(pg->pg_id.pool()),
&children)) {
service.mark_split_in_progress(pg->pg_id, children);
split_pgs(
pg, children, &new_pgs, lastmap, nextmap,
rctx);
@ -7906,17 +7941,25 @@ void OSD::consume_map()
int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
// scan pg's
set<spg_t> new_children;
unsigned pushes_to_free = 0;
set<spg_t> newly_split;
for (auto& shard : shards) {
shard->consume_map(osdmap, &pushes_to_free, &newly_split);
}
if (!newly_split.empty()) {
for (auto& shard : shards) {
shard->prime_splits(osdmap, &newly_split);
}
assert(newly_split.empty());
}
vector<spg_t> pgids;
_get_pgids(&pgids);
// count (FIXME)
vector<PGRef> pgs;
_get_pgs(&pgs);
vector<spg_t> pgids;
pgids.reserve(pgs.size());
for (auto& pg : pgs) {
pgids.push_back(pg->get_pgid());
service.init_splits_between(pg->get_pgid(), service.get_osdmap(), osdmap,
&new_children);
// FIXME: this is lockless and racy, but we don't want to take pg lock
// here.
if (pg->is_primary())
@ -7926,6 +7969,7 @@ void OSD::consume_map()
else
num_pg_stray++;
}
{
// FIXME: move to OSDShard
[[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock);
@ -7939,9 +7983,6 @@ void OSD::consume_map()
}
}
service.expand_pg_num(service.get_osdmap(), osdmap, &new_children);
op_shardedwq.prime_splits(new_children);
service.pre_publish_map(osdmap);
service.await_reserved_maps();
service.publish_map(osdmap);
@ -7952,12 +7993,7 @@ void OSD::consume_map()
service.maybe_inject_dispatch_delay();
// remove any PGs which we no longer host from the pg_slot wait lists
dout(20) << __func__ << " checking pg_slot waiters" << dendl;
op_shardedwq.prune_or_wake_pg_waiters(osdmap, whoami);
service.maybe_inject_dispatch_delay();
// queue null events to push maps down to individual PGs
for (auto pgid : pgids) {
enqueue_peering_evt(
pgid,
@ -8131,11 +8167,8 @@ void OSD::split_pgs(
OSDMapRef nextmap,
PG::RecoveryCtx *rctx)
{
unsigned pg_num = nextmap->get_pg_num(
parent->pg_id.pool());
parent->update_snap_mapper_bits(
parent->get_pgid().get_split_bits(pg_num)
);
unsigned pg_num = nextmap->get_pg_num(parent->pg_id.pool());
parent->update_snap_mapper_bits(parent->get_pgid().get_split_bits(pg_num));
vector<object_stat_sum_t> updated_stats;
parent->start_split_stats(childpgids, &updated_stats);
@ -8145,8 +8178,7 @@ void OSD::split_pgs(
i != childpgids.end();
++i, ++stat_iter) {
assert(stat_iter != updated_stats.end());
dout(10) << "Splitting " << *parent << " into " << *i << dendl;
assert(service.splitting(*i));
dout(10) << __func__ << " splitting " << *parent << " into " << *i << dendl;
PG* child = _make_pg(nextmap, *i);
child->lock(true);
out_pgs->insert(child);
@ -8988,7 +9020,7 @@ void OSD::dequeue_peering_evt(
ThreadPool::TPHandle& handle)
{
PG::RecoveryCtx rctx = create_context();
auto curmap = service.get_osdmap();
auto curmap = sdata->osdmap;
epoch_t need_up_thru = 0, same_interval_since = 0;
if (!pg) {
if (const MQuery *q = dynamic_cast<const MQuery*>(evt->evt.get())) {
@ -9005,12 +9037,7 @@ void OSD::dequeue_peering_evt(
dispatch_context_transaction(rctx, pg, &handle);
need_up_thru = pg->get_need_up_thru();
same_interval_since = pg->get_same_interval_since();
bool deleted = pg->is_deleted();
pg->unlock();
if (deleted) {
#warning hmm?
}
}
if (need_up_thru) {
@ -9402,13 +9429,94 @@ int OSD::init_op_flags(OpRequestRef& op)
// =============================================================
#undef dout_context
#define dout_context osd->cct
#define dout_context cct
#undef dout_prefix
#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq "
#define dout_prefix *_dout << "osd." << osd->get_nodeid() << ":" << shard_id << "." << __func__ << " "
void OSD::ShardedOpWQ::_wake_pg_slot(
void OSDShard::consume_map(
OSDMapRef& new_osdmap,
unsigned *pushes_to_free,
set<spg_t> *new_children)
{
Mutex::Locker l(sdata_op_ordering_lock);
OSDMapRef old_osdmap = std::move(osdmap);
osdmap = new_osdmap;
dout(10) << new_osdmap->get_epoch()
<< " (was " << (old_osdmap ? old_osdmap->get_epoch() : 0) << ")"
<< dendl;
bool queued = false;
// check slots
auto p = pg_slots.begin();
while (p != pg_slots.end()) {
OSDShard::pg_slot& slot = p->second;
const spg_t& pgid = p->first;
dout(20) << __func__ << " " << pgid << dendl;
if (old_osdmap &&
(slot->pg || slot->waiting_for_split)) {
// only prime children for parent slots that are attached to a
// pg or are waiting_for_split (because their ancestor is
// attached to a pg).
osd->service.identify_split_children(old_osdmap, new_osdmap, pgid,
new_children);
}
if (slot.waiting_for_split) {
dout(20) << __func__ << " " << pgid
<< " waiting for split" << dendl;
++p;
continue;
}
if (!slot.waiting_peering.empty()) {
epoch_t first = slot.waiting_peering.begin()->first;
if (first <= osdmap->get_epoch()) {
dout(20) << __func__ << " " << pgid
<< " pending_peering first epoch " << first
<< " <= " << osdmap->get_epoch() << ", requeueing" << dendl;
_wake_pg_slot(pgid, slot);
queued = true;
}
++p;
continue;
}
if (!slot.waiting.empty()) {
if (osdmap->is_up_acting_osd_shard(pgid, osd->get_nodeid())) {
dout(20) << __func__ << " " << pgid << " maps to us, keeping"
<< dendl;
++p;
continue;
}
while (!slot.waiting.empty() &&
slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
auto& qi = slot.waiting.front();
dout(20) << __func__ << " " << pgid
<< " waiting item " << qi
<< " epoch " << qi.get_map_epoch()
<< " <= " << osdmap->get_epoch()
<< ", stale, dropping" << dendl;
*pushes_to_free += qi.get_reserved_pushes();
slot.waiting.pop_front();
}
if (slot.waiting.empty() &&
slot.num_running == 0 &&
!slot.pg) {
dout(20) << __func__ << " " << pgid << " empty, pruning" << dendl;
p = pg_slots.erase(p);
--osd->num_pgs;
continue;
}
}
++p;
}
_prime_splits(new_children);
if (queued) {
sdata_lock.Lock();
sdata_cond.SignalOne();
sdata_lock.Unlock();
}
}
void OSDShard::_wake_pg_slot(
spg_t pgid,
OSDShard *sdata,
OSDShard::pg_slot& slot)
{
dout(20) << __func__ << " " << pgid
@ -9418,13 +9526,13 @@ void OSD::ShardedOpWQ::_wake_pg_slot(
for (auto i = slot.to_process.rbegin();
i != slot.to_process.rend();
++i) {
sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
_enqueue_front(std::move(*i), osd->op_prio_cutoff);
}
slot.to_process.clear();
for (auto i = slot.waiting.rbegin();
i != slot.waiting.rend();
++i) {
sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
_enqueue_front(std::move(*i), osd->op_prio_cutoff);
}
slot.waiting.clear();
for (auto i = slot.waiting_peering.rbegin();
@ -9433,7 +9541,7 @@ void OSD::ShardedOpWQ::_wake_pg_slot(
// this is overkill; we requeue everything, even if some of these items are
// waiting for maps we don't have yet. FIXME.
for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
sdata->_enqueue_front(std::move(*j), osd->op_prio_cutoff);
_enqueue_front(std::move(*j), osd->op_prio_cutoff);
}
}
slot.waiting_peering.clear();
@ -9441,6 +9549,104 @@ void OSD::ShardedOpWQ::_wake_pg_slot(
++slot.requeue_seq;
}
void OSDShard::prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids)
{
Mutex::Locker l(sdata_op_ordering_lock);
_prime_splits(pgids);
if (osdmap->get_epoch() > as_of_osdmap->get_epoch()) {
set<spg_t> newer_children;
for (auto pgid : *pgids) {
osd->service.identify_split_children(as_of_osdmap, osdmap, pgid,
&newer_children);
}
newer_children.insert(pgids->begin(), pgids->end());
dout(10) << "as_of_osdmap " << as_of_osdmap->get_epoch() << " < shard "
<< osdmap->get_epoch() << ", new children " << newer_children
<< dendl;
_prime_splits(&newer_children);
// note: we don't care what is left over here for other shards.
// if this shard is ahead of us and one isn't, e.g., one thread is
// calling into prime_splits via _process (due to a newly created
// pg) and this shard has a newer map due to a racing consume_map,
// then any grandchildren left here will be identified (or were
// identified) when the slower shard's osdmap is advanced.
// _prime_splits() will tolerate the case where the pgid is
// already primed.
}
}
void OSDShard::_prime_splits(set<spg_t> *pgids)
{
dout(10) << *pgids << dendl;
auto p = pgids->begin();
while (p != pgids->end()) {
unsigned shard_index = p->hash_to_shard(osd->num_shards);
if (shard_index == shard_id) {
auto i = pg_slots.find(*p);
if (i == pg_slots.end()) {
dout(10) << "priming slot " << *p << dendl;
OSDShard::pg_slot& slot = pg_slots[*p];
slot.waiting_for_split = true;
} else {
auto q = pg_slots.find(*p);
assert(q != pg_slots.end());
if (q->second->waiting_for_split) {
dout(10) << "slot " << *p << " already primed" << dendl;
} else {
dout(10) << "priming (existing) slot " << *p << dendl;
q->second->waiting_for_split = true;
}
}
p = pgids->erase(p);
} else {
++p;
}
}
}
void OSDShard::register_and_wake_split_child(PG *pg)
{
{
Mutex::Locker l(sdata_op_ordering_lock);
dout(10) << pg->pg_id << " " << pg << dendl;
auto p = pg_slots.find(pg->pg_id);
assert(p != pg_slots.end());
auto& slot = p->second;
assert(!slot.pg);
assert(slot.waiting_for_split);
slot.pg = pg;
++osd->num_pgs;
_wake_pg_slot(pg->pg_id, slot);
}
sdata_lock.Lock();
sdata_cond.SignalOne();
sdata_lock.Unlock();
}
void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
{
Mutex::Locker l(sdata_op_ordering_lock);
vector<spg_t> to_delete;
for (auto& i : pg_slots) {
if (i.first != parent &&
i.first.get_ancestor(old_pg_num) == parent) {
dout(10) << __func__ << " parent " << parent << " clearing " << i.first
<< dendl;
to_delete.push_back(i.first);
}
}
for (auto pgid : to_delete) {
pg_slots.erase(pgid);
}
}
// =============================================================
#undef dout_context
#define dout_context osd->cct
#undef dout_prefix
#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq "
void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid)
{
uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
@ -9450,7 +9656,7 @@ void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid)
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pgid);
if (p != sdata->pg_slots.end()) {
_wake_pg_slot(pgid, sdata, p->second);
sdata->_wake_pg_slot(pgid, p->second);
queued = true;
}
}
@ -9461,102 +9667,6 @@ void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid)
}
}
void OSD::ShardedOpWQ::prime_splits(const set<spg_t>& pgs)
{
dout(20) << __func__ << " " << pgs << dendl;
for (auto pgid : pgs) {
unsigned shard_index = pgid.hash_to_shard(osd->shards.size());
OSDShard* sdata = osd->shards[shard_index];
Mutex::Locker l(sdata->sdata_op_ordering_lock);
OSDShard::pg_slot& slot = sdata->pg_slots[pgid];
slot.waiting_for_split = true;
}
}
void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami)
{
unsigned pushes_to_free = 0;
bool queued = false;
for (auto sdata : osd->shards) {
Mutex::Locker l(sdata->sdata_op_ordering_lock);
sdata->osdmap = osdmap;
auto p = sdata->pg_slots.begin();
while (p != sdata->pg_slots.end()) {
OSDShard::pg_slot& slot = p->second;
if (slot.waiting_for_split) {
dout(20) << __func__ << " " << p->first
<< " waiting for split" << dendl;
++p;
continue;
}
if (!slot.waiting_peering.empty()) {
epoch_t first = slot.waiting_peering.begin()->first;
if (first <= osdmap->get_epoch()) {
dout(20) << __func__ << " " << p->first
<< " pending_peering first epoch " << first
<< " <= " << osdmap->get_epoch() << ", requeueing" << dendl;
_wake_pg_slot(p->first, sdata, slot);
queued = true;
}
++p;
continue;
}
if (!slot.waiting.empty()) {
if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
dout(20) << __func__ << " " << p->first << " maps to us, keeping"
<< dendl;
++p;
continue;
}
while (!slot.waiting.empty() &&
slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
auto& qi = slot.waiting.front();
dout(20) << __func__ << " " << p->first
<< " waiting item " << qi
<< " epoch " << qi.get_map_epoch()
<< " <= " << osdmap->get_epoch()
<< ", stale, dropping" << dendl;
pushes_to_free += qi.get_reserved_pushes();
slot.waiting.pop_front();
}
if (slot.waiting.empty() &&
slot.num_running == 0 &&
!slot.pg) {
dout(20) << __func__ << " " << p->first << " empty, pruning" << dendl;
p = sdata->pg_slots.erase(p);
--osd->num_pgs;
continue;
}
}
++p;
}
if (queued) {
sdata->sdata_lock.Lock();
sdata->sdata_cond.SignalOne();
sdata->sdata_lock.Unlock();
}
}
if (pushes_to_free > 0) {
osd->service.release_reserved_pushes(pushes_to_free);
}
}
void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg)
{
spg_t pgid = pg->get_pgid();
uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
auto sdata = osd->shards[shard_index];
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pgid);
if (p != sdata->pg_slots.end()) {
auto& slot = p->second;
assert(!slot.pg || slot.pg == pg);
dout(20) << __func__ << " " << pgid << " pg " << pg << " cleared" << dendl;
slot.pg = nullptr;
--osd->num_pgs;
}
}
void OSD::ShardedOpWQ::clear_pg_slots()
{
for (auto sdata : osd->shards) {
@ -9672,7 +9782,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
--slot.num_running;
if (slot.to_process.empty()) {
// raced with wake_pg_waiters or prune_or_wake_pg_waiters
// raced with wake_pg_waiters or consume_map
dout(20) << __func__ << " " << token
<< " nothing queued" << dendl;
if (pg) {
@ -9705,10 +9815,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
slot.to_process.pop_front();
dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
unsigned pushes_to_free = 0;
set<spg_t> new_children;
OSDMapRef osdmap;
while (!pg) {
// should this pg shard exist on this osd in this (or a later) epoch?
OSDMapRef osdmap = sdata->osdmap;
osdmap = sdata->osdmap;
const PGCreateInfo *create_info = qi.creates_pg();
if (slot.waiting_for_split) {
dout(20) << __func__ << " " << token
@ -9739,7 +9851,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
// we created the pg! drop out and continue "normally"!
slot.pg = pg; // install in shard slot
++osd->num_pgs;
_wake_pg_slot(token, sdata, slot);
sdata->_wake_pg_slot(token, slot);
// identify split children between create epoch and shard epoch.
osd->service.identify_split_children(
pg->get_osdmap(), osdmap, pg->pg_id, &new_children);
sdata->_prime_splits(&new_children);
// distribute remaining split children to other shards below!
break;
}
dout(20) << __func__ << " ignored create on " << qi << dendl;
@ -9796,6 +9914,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
}
sdata->sdata_op_ordering_lock.Unlock();
if (!new_children.empty()) {
for (auto shard : osd->shards) {
shard->prime_splits(osdmap, &new_children);
}
assert(new_children.empty());
}
if (pushes_to_free) {
osd->service.release_reserved_pushes(pushes_to_free);
}

View File

@ -799,7 +799,7 @@ public:
void queue_for_snap_trim(PG *pg);
void queue_for_scrub(PG *pg, bool with_high_priority);
void queue_for_pg_delete(spg_t pgid, epoch_t e);
void finish_pg_delete(PG *pg);
void finish_pg_delete(PG *pg, unsigned old_pg_num);
private:
// -- pg recovery and associated throttling --
@ -922,6 +922,13 @@ public:
return get_deleted_pool_pg_num(pool);
}
/// identify split child pgids over a osdmap interval
void identify_split_children(
OSDMapRef old_map,
OSDMapRef new_map,
spg_t pgid,
set<spg_t> *new_children);
void need_heartbeat_peer_update();
void init();
@ -1115,6 +1122,9 @@ enum class io_queue {
};
struct OSDShard {
const unsigned shard_id;
CephContext *cct;
OSD *osd;
Mutex sdata_lock;
Cond sdata_cond;
@ -1142,7 +1152,7 @@ struct OSDShard {
/// map of slots for each spg_t. maintains ordering of items dequeued
/// from pqueue while _process thread drops shard lock to acquire the
/// pg lock. slots are removed only by prune_or_wake_pg_waiters.
/// pg lock. slots are removed by consume_map.
unordered_map<spg_t,pg_slot> pg_slots;
/// priority queue
@ -1163,11 +1173,30 @@ struct OSDShard {
priority, cost, std::move(item));
}
/// push osdmap into shard
void consume_map(
OSDMapRef& osdmap,
unsigned *pushes_to_free,
set<spg_t> *new_children);
void _wake_pg_slot(spg_t pgid, OSDShard::pg_slot& slot);
void _prime_splits(set<spg_t> *pgids);
void prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids);
void register_and_wake_split_child(PG *pg);
void unprime_split_children(spg_t parent, unsigned old_pg_num);
OSDShard(
int id,
CephContext *cct,
OSD *osd,
string lock_name, string ordering_lock,
uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
uint64_t max_tok_per_prio, uint64_t min_cost,
io_queue opqueue)
: sdata_lock(lock_name.c_str(), false, true, false, cct),
: shard_id(id),
cct(cct),
osd(osd),
sdata_lock(lock_name.c_str(), false, true, false, cct),
sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
false, cct) {
if (opqueue == io_queue::weightedpriority) {
@ -1610,7 +1639,9 @@ private:
friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
const io_queue op_queue;
public:
const unsigned int op_prio_cutoff;
protected:
/*
* The ordered op delivery chain is:
@ -1659,17 +1690,6 @@ private:
/// wake any pg waiters after a PG is split
void wake_pg_split_waiters(spg_t pgid);
void _wake_pg_slot(spg_t pgid, OSDShard *sdata, OSDShard::pg_slot& slot);
/// prime slots for splitting pgs
void prime_splits(const set<spg_t>& pgs);
/// prune ops (and possibly pg_slots) for pgs that shouldn't be here
void prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami);
/// clear cached PGRef on pg deletion
void clear_pg_pointer(PG *pg);
/// clear pg_slots on shutdown
void clear_pg_slots();
@ -1839,10 +1859,10 @@ public:
vector<OSDShard*> shards;
uint32_t num_shards = 0;
protected:
// -- placement groups --
std::atomic<size_t> num_pgs = {0};
protected:
std::mutex pending_creates_lock;
using create_from_osd_t = std::pair<pg_t, bool /* is primary*/>;
std::set<create_from_osd_t> pending_creates_from_osd;
@ -1852,7 +1872,8 @@ protected:
PGRef _lookup_pg(spg_t pgid);
PG *_lookup_lock_pg(spg_t pgid);
void _register_pg(PGRef pg);
void register_pg(PGRef pg);
void unregister_pg(PG *pg);
void _get_pgs(vector<PGRef> *v, bool clear_too=false);
void _get_pgids(vector<spg_t> *v);

View File

@ -6543,7 +6543,7 @@ void PG::_delete_some()
osd->meta_ch, std::move(t));
assert(r == 0);
osd->finish_pg_delete(this);
osd->finish_pg_delete(this, pool.info.get_pg_num());
deleted = true;
// cancel reserver here, since the PG is about to get deleted and the

View File

@ -520,6 +520,11 @@ struct spg_t {
bool parse(const std::string& s) {
return parse(s.c_str());
}
spg_t get_ancestor(unsigned old_pg_num) const {
return spg_t(pgid.get_ancestor(old_pg_num), shard);
}
bool is_split(unsigned old_pg_num, unsigned new_pg_num,
set<spg_t> *pchildren) const {
set<pg_t> _children;