mirror of
https://github.com/ceph/ceph
synced 2024-12-26 21:43:10 +00:00
osd: implement scrub_purged_snaps command
This a naive one-shot implementation that does the full scan synchronously in the command thread. It shouldn't block any IO except to the extent that it will compete for IO reading the underlying snapmapper omap object. When we discover mapped objects that are covered by ranges of snaps that should be purged, we requeue the snapid for trim on the relevant PG(s). For these 'repeat' trims we skip the final step(s) to mark the snapid as purged, since that presumably already happened some time ago. Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
parent
7628474723
commit
6192fb6031
@ -6060,6 +6060,9 @@ COMMAND("cache status",
|
||||
COMMAND("send_beacon",
|
||||
"Send OSD beacon to mon immediately",
|
||||
"osd", "r")
|
||||
COMMAND("scrub_purged_snaps",
|
||||
"Scrub purged_snaps vs snapmapper index",
|
||||
"osd", "r")
|
||||
};
|
||||
|
||||
void OSD::do_command(
|
||||
@ -6589,6 +6592,36 @@ int OSD::_do_command(
|
||||
if (is_active()) {
|
||||
send_beacon(ceph::coarse_mono_clock::now());
|
||||
}
|
||||
} else if (prefix == "scrub_purged_snaps") {
|
||||
SnapMapper::Scrubber s(cct, store, service.meta_ch,
|
||||
make_snapmapper_oid());
|
||||
s.run();
|
||||
set<pair<spg_t,snapid_t>> queued;
|
||||
for (auto& [pool, snap, hash, shard] : s.stray) {
|
||||
const pg_pool_t *pi = get_osdmap()->get_pg_pool(pool);
|
||||
if (!pi) {
|
||||
dout(20) << __func__ << " pool " << pool << " dne" << dendl;
|
||||
continue;
|
||||
}
|
||||
pg_t pgid(pi->raw_hash_to_pg(hash), pool);
|
||||
spg_t spgid(pgid, shard);
|
||||
pair<spg_t,snapid_t> p(spgid, snap);
|
||||
if (queued.count(p)) {
|
||||
dout(20) << __func__ << " pg " << spgid << " snap " << snap
|
||||
<< " already queued" << dendl;
|
||||
continue;
|
||||
}
|
||||
PGRef pg = lookup_lock_pg(spgid);
|
||||
if (!pg) {
|
||||
dout(20) << __func__ << " pg " << spgid << " not found" << dendl;
|
||||
continue;
|
||||
}
|
||||
queued.insert(p);
|
||||
dout(10) << __func__ << " requeue pg " << spgid << " " << pg << " snap "
|
||||
<< snap << dendl;
|
||||
pg->queue_snap_retrim(snap);
|
||||
pg->unlock();
|
||||
}
|
||||
} else {
|
||||
ss << "unrecognized command '" << prefix << "'";
|
||||
r = -EINVAL;
|
||||
|
@ -320,6 +320,7 @@ void PG::clear_primary_state()
|
||||
projected_log = PGLog::IndexedLog();
|
||||
|
||||
snap_trimq.clear();
|
||||
snap_trimq_repeat.clear();
|
||||
finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
|
||||
release_pg_backoffs();
|
||||
|
||||
@ -533,6 +534,7 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
|
||||
child->update_snap_mapper_bits(split_bits);
|
||||
|
||||
child->snap_trimq = snap_trimq;
|
||||
child->snap_trimq_repeat = snap_trimq_repeat;
|
||||
|
||||
_split_into(child_pgid, child, split_bits);
|
||||
|
||||
@ -1669,6 +1671,27 @@ void PG::on_active_advmap(const OSDMapRef &osdmap)
|
||||
}
|
||||
}
|
||||
|
||||
void PG::queue_snap_retrim(snapid_t snap)
|
||||
{
|
||||
if (!is_active() ||
|
||||
!is_primary()) {
|
||||
dout(10) << __func__ << " snap " << snap << " - not active and primary"
|
||||
<< dendl;
|
||||
return;
|
||||
}
|
||||
if (!snap_trimq.contains(snap)) {
|
||||
snap_trimq.insert(snap);
|
||||
snap_trimq_repeat.insert(snap);
|
||||
dout(20) << __func__ << " snap " << snap
|
||||
<< ", trimq now " << snap_trimq
|
||||
<< ", repeat " << snap_trimq_repeat << dendl;
|
||||
kick_snap_trim();
|
||||
} else {
|
||||
dout(20) << __func__ << " snap " << snap
|
||||
<< " already in trimq " << snap_trimq << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
void PG::on_active_actmap()
|
||||
{
|
||||
if (cct->_conf->osd_check_for_log_corruption)
|
||||
@ -3312,8 +3335,14 @@ ostream& operator<<(ostream& out, const PG& pg)
|
||||
// only show a count if the set is large
|
||||
if (pg.snap_trimq.num_intervals() > 16) {
|
||||
out << pg.snap_trimq.size();
|
||||
if (!pg.snap_trimq_repeat.empty()) {
|
||||
out << "(" << pg.snap_trimq_repeat.size() << ")";
|
||||
}
|
||||
} else {
|
||||
out << pg.snap_trimq;
|
||||
if (!pg.snap_trimq_repeat.empty()) {
|
||||
out << "(" << pg.snap_trimq_repeat << ")";
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!pg.recovery_state.get_info().purged_snaps.empty()) {
|
||||
|
@ -457,6 +457,8 @@ public:
|
||||
void on_active_actmap() override;
|
||||
void on_active_advmap(const OSDMapRef &osdmap) override;
|
||||
|
||||
void queue_snap_retrim(snapid_t snap);
|
||||
|
||||
void on_backfill_reserved() override;
|
||||
void on_backfill_canceled() override;
|
||||
void on_recovery_reserved() override;
|
||||
@ -653,6 +655,7 @@ protected:
|
||||
|
||||
// ------------------
|
||||
interval_set<snapid_t> snap_trimq;
|
||||
set<snapid_t> snap_trimq_repeat;
|
||||
|
||||
/* You should not use these items without taking their respective queue locks
|
||||
* (if they have one) */
|
||||
|
@ -15153,26 +15153,31 @@ boost::statechart::result PrimaryLogPG::AwaitAsyncWork::react(const DoSnapWork&)
|
||||
// Done!
|
||||
ldout(pg->cct, 10) << "got ENOENT" << dendl;
|
||||
|
||||
ldout(pg->cct, 10) << "adding snap " << snap_to_trim
|
||||
<< " to purged_snaps"
|
||||
<< dendl;
|
||||
|
||||
ObjectStore::Transaction t;
|
||||
pg->recovery_state.adjust_purged_snaps(
|
||||
[snap_to_trim](auto &purged_snaps) {
|
||||
purged_snaps.insert(snap_to_trim);
|
||||
});
|
||||
pg->write_if_dirty(t);
|
||||
|
||||
pg->snap_trimq.erase(snap_to_trim);
|
||||
ldout(pg->cct, 10) << "purged_snaps now "
|
||||
<< pg->info.purged_snaps << ", snap_trimq now "
|
||||
<< pg->snap_trimq << dendl;
|
||||
|
||||
int tr = pg->osd->store->queue_transaction(pg->ch, std::move(t), NULL);
|
||||
ceph_assert(tr == 0);
|
||||
if (pg->snap_trimq_repeat.count(snap_to_trim)) {
|
||||
ldout(pg->cct, 10) << " removing from snap_trimq_repeat" << dendl;
|
||||
pg->snap_trimq_repeat.erase(snap_to_trim);
|
||||
} else {
|
||||
ldout(pg->cct, 10) << "adding snap " << snap_to_trim
|
||||
<< " to purged_snaps"
|
||||
<< dendl;
|
||||
ObjectStore::Transaction t;
|
||||
pg->recovery_state.adjust_purged_snaps(
|
||||
[snap_to_trim](auto &purged_snaps) {
|
||||
purged_snaps.insert(snap_to_trim);
|
||||
});
|
||||
pg->write_if_dirty(t);
|
||||
|
||||
pg->recovery_state.share_pg_info();
|
||||
ldout(pg->cct, 10) << "purged_snaps now "
|
||||
<< pg->info.purged_snaps << ", snap_trimq now "
|
||||
<< pg->snap_trimq << dendl;
|
||||
|
||||
int tr = pg->osd->store->queue_transaction(pg->ch, std::move(t), NULL);
|
||||
ceph_assert(tr == 0);
|
||||
|
||||
pg->recovery_state.share_pg_info();
|
||||
}
|
||||
post_event(KickTrim());
|
||||
return transit< NotTrimming >();
|
||||
}
|
||||
|
@ -418,6 +418,7 @@ void SnapMapper::make_purged_snap_key_value(
|
||||
{
|
||||
string k = make_purged_snap_key(pool, end - 1);
|
||||
auto& v = (*m)[k];
|
||||
ceph::encode(pool, v);
|
||||
ceph::encode(begin, v);
|
||||
ceph::encode(end, v);
|
||||
}
|
||||
@ -446,6 +447,8 @@ int SnapMapper::_lookup_purged_snap(
|
||||
}
|
||||
bufferlist v = it->value();
|
||||
auto p = v.cbegin();
|
||||
int64_t gotpool;
|
||||
decode(gotpool, p);
|
||||
decode(*begin, p);
|
||||
decode(*end, p);
|
||||
if (snap < *begin || snap >= *end) {
|
||||
@ -520,6 +523,116 @@ void SnapMapper::record_purged_snaps(
|
||||
}
|
||||
|
||||
|
||||
bool SnapMapper::Scrubber::_parse_p()
|
||||
{
|
||||
if (!psit->valid()) {
|
||||
pool = -1;
|
||||
return false;
|
||||
}
|
||||
if (psit->key().find(PURGED_SNAP_PREFIX) != 0) {
|
||||
pool = -1;
|
||||
return false;
|
||||
}
|
||||
bufferlist v = psit->value();
|
||||
auto p = v.cbegin();
|
||||
ceph::decode(pool, p);
|
||||
ceph::decode(begin, p);
|
||||
ceph::decode(end, p);
|
||||
dout(20) << __func__ << " purged_snaps pool " << pool
|
||||
<< " [" << begin << "," << end << ")" << dendl;
|
||||
psit->next();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SnapMapper::Scrubber::_parse_m()
|
||||
{
|
||||
if (!mapit->valid()) {
|
||||
return false;
|
||||
}
|
||||
if (mapit->key().find(MAPPING_PREFIX) != 0) {
|
||||
return false;
|
||||
}
|
||||
auto v = mapit->value();
|
||||
auto p = v.cbegin();
|
||||
mapping.decode(p);
|
||||
|
||||
{
|
||||
unsigned long long p, s;
|
||||
long sh;
|
||||
string k = mapit->key();
|
||||
int r = sscanf(k.c_str(), "SNA_%lld_%llx.%lx", &p, &s, &sh);
|
||||
if (r != 1) {
|
||||
shard = shard_id_t::NO_SHARD;
|
||||
} else {
|
||||
shard = shard_id_t(sh);
|
||||
}
|
||||
}
|
||||
dout(20) << __func__ << " mapping pool " << mapping.hoid.pool
|
||||
<< " snap " << mapping.snap
|
||||
<< " shard " << shard
|
||||
<< " " << mapping.hoid << dendl;
|
||||
mapit->next();
|
||||
return true;
|
||||
}
|
||||
|
||||
void SnapMapper::Scrubber::_init()
|
||||
{
|
||||
dout(10) << __func__ << dendl;
|
||||
}
|
||||
|
||||
void SnapMapper::Scrubber::run()
|
||||
{
|
||||
dout(10) << __func__ << dendl;
|
||||
|
||||
_init();
|
||||
|
||||
psit = store->get_omap_iterator(ch, hoid);
|
||||
psit->upper_bound(PURGED_SNAP_PREFIX);
|
||||
_parse_p();
|
||||
|
||||
mapit = store->get_omap_iterator(ch, hoid);
|
||||
mapit->upper_bound(MAPPING_PREFIX);
|
||||
|
||||
while (_parse_m()) {
|
||||
// advance to next purged_snaps range?
|
||||
while (pool >= 0 &&
|
||||
(mapping.hoid.pool > pool ||
|
||||
(mapping.hoid.pool == pool && mapping.snap >= end))) {
|
||||
_parse_p();
|
||||
}
|
||||
if (pool < 0) {
|
||||
dout(10) << __func__ << " passed final purged_snaps interval, rest ok"
|
||||
<< dendl;
|
||||
break;
|
||||
}
|
||||
if (mapping.hoid.pool < pool ||
|
||||
mapping.snap < begin) {
|
||||
// ok
|
||||
dout(20) << __func__ << " ok " << mapping.hoid
|
||||
<< " snap " << mapping.snap
|
||||
<< " precedes pool " << pool
|
||||
<< " purged_snaps [" << begin << "," << end << ")" << dendl;
|
||||
} else {
|
||||
assert(mapping.snap >= begin &&
|
||||
mapping.snap < end &&
|
||||
mapping.hoid.pool == pool);
|
||||
// invalid
|
||||
dout(10) << __func__ << " stray " << mapping.hoid
|
||||
<< " snap " << mapping.snap
|
||||
<< " in pool " << pool
|
||||
<< " shard " << shard
|
||||
<< " purged_snaps [" << begin << "," << end << ")" << dendl;
|
||||
stray.emplace_back(std::tuple<int64_t,snapid_t,uint32_t,shard_id_t>(
|
||||
pool, mapping.snap, mapping.hoid.get_hash(),
|
||||
shard
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
dout(10) << __func__ << " end, found " << stray.size() << " stray" << dendl;
|
||||
}
|
||||
|
||||
|
||||
// -------------------------------------
|
||||
// legacy conversion/support
|
||||
|
||||
|
@ -138,6 +138,41 @@ public:
|
||||
static const char *PURGED_SNAP_EPOCH_PREFIX;
|
||||
static const char *PURGED_SNAP_PREFIX;
|
||||
|
||||
struct Scrubber {
|
||||
CephContext *cct;
|
||||
ObjectStore *store;
|
||||
ObjectStore::CollectionHandle ch;
|
||||
ghobject_t hoid;
|
||||
|
||||
ObjectMap::ObjectMapIterator psit;
|
||||
int64_t pool;
|
||||
snapid_t begin, end;
|
||||
|
||||
bool _parse_p(); ///< advance the purged_snaps pointer
|
||||
|
||||
ObjectMap::ObjectMapIterator mapit;
|
||||
Mapping mapping;
|
||||
shard_id_t shard;
|
||||
|
||||
bool _parse_m(); ///< advance the (object) mapper pointer
|
||||
|
||||
vector<std::tuple<int64_t, snapid_t, uint32_t, shard_id_t>> stray;
|
||||
|
||||
Scrubber(
|
||||
CephContext *cct,
|
||||
ObjectStore *store,
|
||||
ObjectStore::CollectionHandle& ch,
|
||||
ghobject_t hoid)
|
||||
: cct(cct),
|
||||
store(store),
|
||||
ch(ch),
|
||||
hoid(hoid) {}
|
||||
|
||||
|
||||
void _init();
|
||||
void run();
|
||||
};
|
||||
|
||||
static int convert_legacy(
|
||||
CephContext *cct,
|
||||
ObjectStore *store,
|
||||
@ -152,6 +187,11 @@ public:
|
||||
ghobject_t hoid,
|
||||
ObjectStore::Transaction *t,
|
||||
map<epoch_t,mempool::osdmap::map<int64_t,snap_interval_set_t>> purged_snaps);
|
||||
static void scrub_purged_snaps(
|
||||
CephContext *cct,
|
||||
ObjectStore *store,
|
||||
ObjectStore::CollectionHandle& ch,
|
||||
ghobject_t hoid);
|
||||
|
||||
private:
|
||||
static int _lookup_purged_snap(
|
||||
|
Loading…
Reference in New Issue
Block a user