mirror of
https://github.com/ceph/ceph
synced 2025-01-04 02:02:36 +00:00
osd: piecewise scrub
Perform scrub in stages, with each unit of work requeuing an item in the work queue. Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
parent
dca1257ab1
commit
bf16f59887
@ -743,6 +743,7 @@ OPTION(osd_scrub_auto_repair_num_errors, OPT_U32) // only auto-repair when num
|
||||
OPTION(osd_deep_scrub_interval, OPT_FLOAT) // once a week
|
||||
OPTION(osd_deep_scrub_randomize_ratio, OPT_FLOAT) // scrubs will randomly become deep scrubs at this rate (0.15 -> 15% of scrubs are deep)
|
||||
OPTION(osd_deep_scrub_stride, OPT_INT)
|
||||
OPTION(osd_deep_scrub_keys, OPT_INT)
|
||||
OPTION(osd_deep_scrub_update_digest_min_age, OPT_INT) // objects must be this old (seconds) before we update the whole-object digest on scrub
|
||||
OPTION(osd_skip_data_digest, OPT_BOOL)
|
||||
OPTION(osd_class_dir, OPT_STR) // where rados plugins are stored
|
||||
|
@ -2693,6 +2693,10 @@ std::vector<Option> get_global_options() {
|
||||
.set_default(524288)
|
||||
.set_description(""),
|
||||
|
||||
Option("osd_deep_scrub_keys", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(1024)
|
||||
.set_description(""),
|
||||
|
||||
Option("osd_deep_scrub_update_digest_min_age", Option::TYPE_INT, Option::LEVEL_ADVANCED)
|
||||
.set_default(2_hr)
|
||||
.set_description(""),
|
||||
|
@ -2417,59 +2417,58 @@ void ECBackend::rollback_append(
|
||||
old_size));
|
||||
}
|
||||
|
||||
void ECBackend::be_deep_scrub(
|
||||
int ECBackend::be_deep_scrub(
|
||||
const hobject_t &poid,
|
||||
uint32_t seed,
|
||||
ScrubMap::object &o,
|
||||
ThreadPool::TPHandle &handle,
|
||||
ScrubMap* const map) {
|
||||
bufferhash h(-1); // we always used -1
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos,
|
||||
ScrubMap::object &o)
|
||||
{
|
||||
dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
|
||||
int r;
|
||||
uint64_t stride = cct->_conf->osd_deep_scrub_stride;
|
||||
if (stride % sinfo.get_chunk_size())
|
||||
stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
|
||||
utime_t sleeptime;
|
||||
sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
|
||||
uint64_t pos = 0;
|
||||
bool skip_data_digest = store->has_builtin_csum() &&
|
||||
g_conf->osd_skip_data_digest;
|
||||
|
||||
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
|
||||
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
|
||||
|
||||
while (true) {
|
||||
if (sleeptime != utime_t()) {
|
||||
lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
|
||||
sleeptime.sleep();
|
||||
}
|
||||
bufferlist bl;
|
||||
handle.reset_tp_timeout();
|
||||
r = store->read(
|
||||
ch,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
pos,
|
||||
stride, bl,
|
||||
fadvise_flags);
|
||||
if (r < 0)
|
||||
break;
|
||||
if (bl.length() % sinfo.get_chunk_size()) {
|
||||
r = -EIO;
|
||||
break;
|
||||
}
|
||||
pos += r;
|
||||
if (!skip_data_digest) {
|
||||
h << bl;
|
||||
}
|
||||
if ((unsigned)r < stride)
|
||||
break;
|
||||
utime_t sleeptime;
|
||||
sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
|
||||
|
||||
if (pos.data_pos == 0) {
|
||||
pos.data_hash = bufferhash(pos.seed);
|
||||
}
|
||||
|
||||
if (r == -EIO) {
|
||||
dout(0) << "_scan_list " << poid << " got "
|
||||
<< r << " on read, read_error" << dendl;
|
||||
uint64_t stride = cct->_conf->osd_deep_scrub_stride;
|
||||
if (stride % sinfo.get_chunk_size())
|
||||
stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
|
||||
|
||||
bufferlist bl;
|
||||
r = store->read(
|
||||
ch,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
pos.data_pos,
|
||||
stride, bl,
|
||||
fadvise_flags);
|
||||
if (r < 0) {
|
||||
dout(20) << __func__ << " " << poid << " got "
|
||||
<< r << " on read, read_error" << dendl;
|
||||
o.read_error = true;
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
if (bl.length() % sinfo.get_chunk_size()) {
|
||||
dout(20) << __func__ << " " << poid << " got "
|
||||
<< r << " on read, not chunk size " << sinfo.get_chunk_size() << " aligned"
|
||||
<< dendl;
|
||||
o.read_error = true;
|
||||
return 0;
|
||||
}
|
||||
if (r > 0 && !skip_data_digest) {
|
||||
pos.data_hash << bl;
|
||||
}
|
||||
pos.data_pos += r;
|
||||
if (r == (int)stride) {
|
||||
return -EINPROGRESS;
|
||||
}
|
||||
|
||||
ECUtil::HashInfoRef hinfo = get_hash_info(poid, false, &o.attrs);
|
||||
@ -2477,21 +2476,22 @@ void ECBackend::be_deep_scrub(
|
||||
dout(0) << "_scan_list " << poid << " could not retrieve hash info" << dendl;
|
||||
o.read_error = true;
|
||||
o.digest_present = false;
|
||||
return;
|
||||
return 0;
|
||||
} else {
|
||||
if (!get_parent()->get_pool().allows_ecoverwrites()) {
|
||||
assert(hinfo->has_chunk_hash());
|
||||
if (hinfo->get_total_chunk_size() != pos) {
|
||||
if (hinfo->get_total_chunk_size() != (unsigned)pos.data_pos) {
|
||||
dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
|
||||
o.ec_size_mismatch = true;
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!skip_data_digest &&
|
||||
hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
|
||||
hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) !=
|
||||
pos.data_hash.digest()) {
|
||||
dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
|
||||
o.ec_hash_mismatch = true;
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* We checked above that we match our own stored hash. We cannot
|
||||
@ -2511,6 +2511,7 @@ void ECBackend::be_deep_scrub(
|
||||
}
|
||||
}
|
||||
|
||||
o.omap_digest = seed;
|
||||
o.omap_digest = pos.seed;
|
||||
o.omap_digest_present = true;
|
||||
return 0;
|
||||
}
|
||||
|
@ -671,12 +671,11 @@ public:
|
||||
|
||||
bool auto_repair_supported() const override { return true; }
|
||||
|
||||
void be_deep_scrub(
|
||||
const hobject_t &obj,
|
||||
uint32_t seed,
|
||||
ScrubMap::object &o,
|
||||
ThreadPool::TPHandle &handle,
|
||||
ScrubMap* const map = nullptr) override;
|
||||
int be_deep_scrub(
|
||||
const hobject_t &poid,
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos,
|
||||
ScrubMap::object &o) override;
|
||||
uint64_t be_get_ondisk_size(uint64_t logical_size) override {
|
||||
return sinfo.logical_to_next_chunk_offset(logical_size);
|
||||
}
|
||||
|
@ -1252,10 +1252,12 @@ bool OSDService::can_inc_scrubs_pending()
|
||||
|
||||
if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
|
||||
dout(20) << __func__ << " " << scrubs_pending << " -> " << (scrubs_pending+1)
|
||||
<< " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
|
||||
<< " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active
|
||||
<< ")" << dendl;
|
||||
can_inc = true;
|
||||
} else {
|
||||
dout(20) << __func__ << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
|
||||
dout(20) << __func__ << " " << scrubs_pending << " + " << scrubs_active
|
||||
<< " active >= max " << cct->_conf->osd_max_scrubs << dendl;
|
||||
}
|
||||
|
||||
return can_inc;
|
||||
|
176
src/osd/PG.cc
176
src/osd/PG.cc
@ -4189,45 +4189,62 @@ void PG::_repair_oinfo_oid(ScrubMap &smap)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* build a scrub map over a chunk without releasing the lock
|
||||
* only used by chunky scrub
|
||||
*/
|
||||
int PG::build_scrub_map_chunk(
|
||||
ScrubMap &map,
|
||||
hobject_t start, hobject_t end, bool deep, uint32_t seed,
|
||||
ScrubMapBuilder &pos,
|
||||
hobject_t start,
|
||||
hobject_t end,
|
||||
bool deep,
|
||||
uint32_t seed,
|
||||
ThreadPool::TPHandle &handle)
|
||||
{
|
||||
dout(10) << __func__ << " [" << start << "," << end << ") "
|
||||
<< " seed " << seed << dendl;
|
||||
<< " pos " << pos
|
||||
<< dendl;
|
||||
|
||||
map.valid_through = info.last_update;
|
||||
// start
|
||||
while (pos.empty()) {
|
||||
pos.deep = deep;
|
||||
pos.seed = seed;
|
||||
map.valid_through = info.last_update;
|
||||
osr->flush();
|
||||
|
||||
osr->flush();
|
||||
|
||||
// objects
|
||||
vector<hobject_t> ls;
|
||||
vector<ghobject_t> rollback_obs;
|
||||
osr->flush();
|
||||
int ret = get_pgbackend()->objects_list_range(
|
||||
start,
|
||||
end,
|
||||
0,
|
||||
&ls,
|
||||
&rollback_obs);
|
||||
if (ret < 0) {
|
||||
dout(5) << "objects_list_range error: " << ret << dendl;
|
||||
return ret;
|
||||
// objects
|
||||
vector<ghobject_t> rollback_obs;
|
||||
pos.ret = get_pgbackend()->objects_list_range(
|
||||
start,
|
||||
end,
|
||||
0,
|
||||
&pos.ls,
|
||||
&rollback_obs);
|
||||
if (pos.ret < 0) {
|
||||
dout(5) << "objects_list_range error: " << pos.ret << dendl;
|
||||
return pos.ret;
|
||||
}
|
||||
if (pos.ls.empty()) {
|
||||
break;
|
||||
}
|
||||
_scan_rollback_obs(rollback_obs, handle);
|
||||
pos.pos = 0;
|
||||
return -EINPROGRESS;
|
||||
}
|
||||
|
||||
// scan objects
|
||||
while (!pos.done()) {
|
||||
int r = get_pgbackend()->be_scan_list(map, pos);
|
||||
if (r == -EINPROGRESS) {
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
get_pgbackend()->be_scan_list(map, ls, deep, seed, handle);
|
||||
_scan_rollback_obs(rollback_obs, handle);
|
||||
// finish
|
||||
dout(20) << __func__ << " finishing" << dendl;
|
||||
assert(pos.done());
|
||||
_scan_snaps(map);
|
||||
_repair_oinfo_oid(map);
|
||||
|
||||
dout(20) << __func__ << " done" << dendl;
|
||||
dout(20) << __func__ << " done, got " << map.objects.size() << " items"
|
||||
<< dendl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -4313,8 +4330,6 @@ void PG::replica_scrub(
|
||||
return;
|
||||
}
|
||||
|
||||
ScrubMap map;
|
||||
|
||||
assert(msg->chunky);
|
||||
if (last_update_applied < msg->scrub_to) {
|
||||
dout(10) << "waiting for last_update_applied to catch up" << dendl;
|
||||
@ -4328,24 +4343,16 @@ void PG::replica_scrub(
|
||||
return;
|
||||
}
|
||||
|
||||
// compensate for hobject_t's with wrong pool from sloppy hammer OSDs
|
||||
hobject_t start = msg->start;
|
||||
hobject_t end = msg->end;
|
||||
if (!start.is_max())
|
||||
start.pool = info.pgid.pool();
|
||||
if (!end.is_max())
|
||||
end.pool = info.pgid.pool();
|
||||
scrubber.state = Scrubber::BUILD_MAP_REPLICA;
|
||||
scrubber.replica_scrub_start = msg->min_epoch;
|
||||
scrubber.start = msg->start;
|
||||
scrubber.end = msg->end;
|
||||
scrubber.deep = msg->deep;
|
||||
scrubber.epoch_start = info.history.same_interval_since;
|
||||
|
||||
build_scrub_map_chunk(
|
||||
map, start, end, msg->deep, msg->seed,
|
||||
handle);
|
||||
scrubber.replica_scrubmap_pos.reset();
|
||||
|
||||
MOSDRepScrubMap *reply = new MOSDRepScrubMap(
|
||||
spg_t(info.pgid.pgid, get_primary().shard),
|
||||
msg->map_epoch,
|
||||
pg_whoami);
|
||||
encode(map, reply->get_data());
|
||||
osd->send_message_osd_cluster(reply, msg->get_connection());
|
||||
requeue_scrub(false);
|
||||
}
|
||||
|
||||
/* Scrub:
|
||||
@ -4402,6 +4409,13 @@ void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
|
||||
scrub_queued = false;
|
||||
scrubber.needs_sleep = true;
|
||||
|
||||
// for the replica
|
||||
if (!is_primary() &&
|
||||
scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
|
||||
chunky_scrub(handle);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
|
||||
dout(10) << "scrub -- not primary or active or not clean" << dendl;
|
||||
state_clear(PG_STATE_SCRUBBING);
|
||||
@ -4522,6 +4536,7 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
|
||||
switch (scrubber.state) {
|
||||
case PG::Scrubber::INACTIVE:
|
||||
dout(10) << "scrub start" << dendl;
|
||||
assert(is_primary());
|
||||
|
||||
publish_stats_to_osd();
|
||||
scrubber.epoch_start = info.history.same_interval_since;
|
||||
@ -4579,7 +4594,9 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
|
||||
* left end of the range if we are a tier because they may legitimately
|
||||
* not exist (see _scrub).
|
||||
*/
|
||||
int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min);
|
||||
int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
|
||||
scrubber.preempt_divisor);
|
||||
int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max);
|
||||
hobject_t start = scrubber.start;
|
||||
hobject_t candidate_end;
|
||||
vector<hobject_t> objects;
|
||||
@ -4587,7 +4604,7 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
|
||||
ret = get_pgbackend()->objects_list_partial(
|
||||
start,
|
||||
min,
|
||||
std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max),
|
||||
max,
|
||||
&objects,
|
||||
&candidate_end);
|
||||
assert(ret >= 0);
|
||||
@ -4679,30 +4696,43 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
|
||||
break;
|
||||
|
||||
case PG::Scrubber::WAIT_LAST_UPDATE:
|
||||
if (last_update_applied >= scrubber.subset_last_update) {
|
||||
scrubber.state = PG::Scrubber::BUILD_MAP;
|
||||
} else {
|
||||
if (last_update_applied < scrubber.subset_last_update) {
|
||||
// will be requeued by op_applied
|
||||
dout(15) << "wait for writes to flush" << dendl;
|
||||
done = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
scrubber.state = PG::Scrubber::BUILD_MAP;
|
||||
scrubber.primary_scrubmap_pos.reset();
|
||||
break;
|
||||
|
||||
case PG::Scrubber::BUILD_MAP:
|
||||
assert(last_update_applied >= scrubber.subset_last_update);
|
||||
|
||||
// build my own scrub map
|
||||
ret = build_scrub_map_chunk(scrubber.primary_scrubmap,
|
||||
scrubber.start, scrubber.end,
|
||||
scrubber.deep, scrubber.seed,
|
||||
handle);
|
||||
if (ret < 0) {
|
||||
dout(5) << "error building scrub map: " << ret << ", aborting" << dendl;
|
||||
ret = build_scrub_map_chunk(
|
||||
scrubber.primary_scrubmap,
|
||||
scrubber.primary_scrubmap_pos,
|
||||
scrubber.start, scrubber.end,
|
||||
scrubber.deep, scrubber.seed,
|
||||
handle);
|
||||
if (ret == -EINPROGRESS) {
|
||||
requeue_scrub();
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
|
||||
break;
|
||||
|
||||
case PG::Scrubber::BUILD_MAP_DONE:
|
||||
if (scrubber.primary_scrubmap_pos.ret < 0) {
|
||||
dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
|
||||
<< ", aborting" << dendl;
|
||||
scrub_clear_state();
|
||||
scrub_unreserve_replicas();
|
||||
return;
|
||||
}
|
||||
|
||||
dout(10) << __func__ << " waiting_on_whom was "
|
||||
<< scrubber.waiting_on_whom << dendl;
|
||||
assert(scrubber.waiting_on_whom.count(pg_whoami));
|
||||
@ -4767,6 +4797,38 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
|
||||
|
||||
break;
|
||||
|
||||
case PG::Scrubber::BUILD_MAP_REPLICA:
|
||||
// build my own scrub map
|
||||
ret = build_scrub_map_chunk(
|
||||
scrubber.replica_scrubmap,
|
||||
scrubber.replica_scrubmap_pos,
|
||||
scrubber.start, scrubber.end,
|
||||
scrubber.deep, scrubber.seed,
|
||||
handle);
|
||||
if (ret == -EINPROGRESS) {
|
||||
requeue_scrub();
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
// reply
|
||||
{
|
||||
MOSDRepScrubMap *reply = new MOSDRepScrubMap(
|
||||
spg_t(info.pgid.pgid, get_primary().shard),
|
||||
scrubber.replica_scrub_start,
|
||||
pg_whoami);
|
||||
::encode(scrubber.replica_scrubmap, reply->get_data());
|
||||
osd->send_message_osd_cluster(
|
||||
get_primary().osd, reply,
|
||||
scrubber.replica_scrub_start);
|
||||
}
|
||||
scrubber.state = PG::Scrubber::INACTIVE;
|
||||
scrubber.replica_scrubmap = ScrubMap();
|
||||
scrubber.replica_scrubmap_pos = ScrubMapBuilder();
|
||||
scrubber.start = hobject_t();
|
||||
scrubber.end = hobject_t();
|
||||
done = true;
|
||||
break;
|
||||
|
||||
default:
|
||||
ceph_abort();
|
||||
}
|
||||
|
13
src/osd/PG.h
13
src/osd/PG.h
@ -1459,6 +1459,10 @@ public:
|
||||
int large_omap_objects = 0;
|
||||
int fixed;
|
||||
ScrubMap primary_scrubmap;
|
||||
ScrubMapBuilder primary_scrubmap_pos;
|
||||
epoch_t replica_scrub_start = 0;
|
||||
ScrubMap replica_scrubmap;
|
||||
ScrubMapBuilder replica_scrubmap_pos;
|
||||
map<pg_shard_t, ScrubMap> received_maps;
|
||||
OpRequestRef active_rep_scrub;
|
||||
utime_t scrub_reg_stamp; // stamp we registered for
|
||||
@ -1501,10 +1505,12 @@ public:
|
||||
WAIT_PUSHES,
|
||||
WAIT_LAST_UPDATE,
|
||||
BUILD_MAP,
|
||||
BUILD_MAP_DONE,
|
||||
WAIT_REPLICAS,
|
||||
COMPARE_MAPS,
|
||||
WAIT_DIGEST_UPDATES,
|
||||
FINISH,
|
||||
BUILD_MAP_REPLICA,
|
||||
} state;
|
||||
|
||||
std::unique_ptr<Scrub::Store> store;
|
||||
@ -1535,10 +1541,12 @@ public:
|
||||
case WAIT_PUSHES: ret = "WAIT_PUSHES"; break;
|
||||
case WAIT_LAST_UPDATE: ret = "WAIT_LAST_UPDATE"; break;
|
||||
case BUILD_MAP: ret = "BUILD_MAP"; break;
|
||||
case BUILD_MAP_DONE: ret = "BUILD_MAP_DONE"; break;
|
||||
case WAIT_REPLICAS: ret = "WAIT_REPLICAS"; break;
|
||||
case COMPARE_MAPS: ret = "COMPARE_MAPS"; break;
|
||||
case WAIT_DIGEST_UPDATES: ret = "WAIT_DIGEST_UPDATES"; break;
|
||||
case FINISH: ret = "FINISH"; break;
|
||||
case BUILD_MAP_REPLICA: ret = "BUILD_MAP_REPLICA"; break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1580,6 +1588,10 @@ public:
|
||||
missing.clear();
|
||||
authoritative.clear();
|
||||
num_digest_updates_pending = 0;
|
||||
primary_scrubmap = ScrubMap();
|
||||
primary_scrubmap_pos.reset();
|
||||
replica_scrubmap = ScrubMap();
|
||||
replica_scrubmap_pos.reset();
|
||||
cleaned_meta_map = ScrubMap();
|
||||
sleeping = false;
|
||||
needs_sleep = true;
|
||||
@ -1618,6 +1630,7 @@ protected:
|
||||
uint32_t seed);
|
||||
int build_scrub_map_chunk(
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos,
|
||||
hobject_t start, hobject_t end, bool deep, uint32_t seed,
|
||||
ThreadPool::TPHandle &handle);
|
||||
/**
|
||||
|
@ -570,58 +570,53 @@ PGBackend *PGBackend::build_pg_backend(
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* pg lock may or may not be held
|
||||
*/
|
||||
void PGBackend::be_scan_list(
|
||||
ScrubMap &map, const vector<hobject_t> &ls, bool deep, uint32_t seed,
|
||||
ThreadPool::TPHandle &handle)
|
||||
int PGBackend::be_scan_list(
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos)
|
||||
{
|
||||
dout(10) << __func__ << " scanning " << ls.size() << " objects"
|
||||
<< (deep ? " deeply" : "") << dendl;
|
||||
int i = 0;
|
||||
for (vector<hobject_t>::const_iterator p = ls.begin();
|
||||
p != ls.end();
|
||||
++p, i++) {
|
||||
handle.reset_tp_timeout();
|
||||
hobject_t poid = *p;
|
||||
dout(10) << __func__ << " " << pos << dendl;
|
||||
assert(!pos.done());
|
||||
assert(pos.pos < pos.ls.size());
|
||||
hobject_t& poid = pos.ls[pos.pos];
|
||||
|
||||
struct stat st;
|
||||
int r = store->stat(
|
||||
struct stat st;
|
||||
int r = store->stat(
|
||||
ch,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
&st,
|
||||
true);
|
||||
if (r == 0) {
|
||||
ScrubMap::object &o = map.objects[poid];
|
||||
o.size = st.st_size;
|
||||
assert(!o.negative);
|
||||
store->getattrs(
|
||||
ch,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
&st,
|
||||
true);
|
||||
if (r == 0) {
|
||||
ScrubMap::object &o = map.objects[poid];
|
||||
o.size = st.st_size;
|
||||
assert(!o.negative);
|
||||
store->getattrs(
|
||||
ch,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
o.attrs);
|
||||
o.attrs);
|
||||
|
||||
// calculate the CRC32 on deep scrubs
|
||||
if (deep) {
|
||||
be_deep_scrub(*p, seed, o, handle, &map);
|
||||
}
|
||||
|
||||
dout(25) << __func__ << " " << poid << dendl;
|
||||
} else if (r == -ENOENT) {
|
||||
dout(25) << __func__ << " " << poid << " got " << r
|
||||
<< ", skipping" << dendl;
|
||||
} else if (r == -EIO) {
|
||||
dout(25) << __func__ << " " << poid << " got " << r
|
||||
<< ", stat_error" << dendl;
|
||||
ScrubMap::object &o = map.objects[poid];
|
||||
o.stat_error = true;
|
||||
} else {
|
||||
derr << __func__ << " got: " << cpp_strerror(r) << dendl;
|
||||
ceph_abort();
|
||||
if (pos.deep) {
|
||||
r = be_deep_scrub(poid, map, pos, o);
|
||||
}
|
||||
dout(25) << __func__ << " " << poid << dendl;
|
||||
} else if (r == -ENOENT) {
|
||||
dout(25) << __func__ << " " << poid << " got " << r
|
||||
<< ", skipping" << dendl;
|
||||
} else if (r == -EIO) {
|
||||
dout(25) << __func__ << " " << poid << " got " << r
|
||||
<< ", stat_error" << dendl;
|
||||
ScrubMap::object &o = map.objects[poid];
|
||||
o.stat_error = true;
|
||||
} else {
|
||||
derr << __func__ << " got: " << cpp_strerror(r) << dendl;
|
||||
ceph_abort();
|
||||
}
|
||||
if (r == -EINPROGRESS) {
|
||||
return -EINPROGRESS;
|
||||
}
|
||||
pos.next_object();
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool PGBackend::be_compare_scrub_objects(
|
||||
|
@ -132,6 +132,7 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
|
||||
eversion_t v,
|
||||
Context *on_complete) = 0;
|
||||
|
||||
|
||||
/**
|
||||
* Bless a context
|
||||
*
|
||||
@ -557,9 +558,9 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
|
||||
Context *on_complete, bool fast_read = false) = 0;
|
||||
|
||||
virtual bool auto_repair_supported() const = 0;
|
||||
void be_scan_list(
|
||||
ScrubMap &map, const vector<hobject_t> &ls, bool deep, uint32_t seed,
|
||||
ThreadPool::TPHandle &handle);
|
||||
int be_scan_list(
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos);
|
||||
bool be_compare_scrub_objects(
|
||||
pg_shard_t auth_shard,
|
||||
const ScrubMap::object &auth,
|
||||
@ -590,12 +591,11 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
|
||||
ostream &errorstream);
|
||||
virtual uint64_t be_get_ondisk_size(
|
||||
uint64_t logical_size) = 0;
|
||||
virtual void be_deep_scrub(
|
||||
const hobject_t &poid,
|
||||
uint32_t seed,
|
||||
ScrubMap::object &o,
|
||||
ThreadPool::TPHandle &handle,
|
||||
ScrubMap* const map = nullptr) = 0;
|
||||
virtual int be_deep_scrub(
|
||||
const hobject_t &oid,
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos,
|
||||
ScrubMap::object &o) = 0;
|
||||
void be_large_omap_check(
|
||||
const map<pg_shard_t,ScrubMap*> &maps,
|
||||
const set<hobject_t> &master_set,
|
||||
|
@ -651,135 +651,145 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op)
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicatedBackend::be_deep_scrub(
|
||||
int ReplicatedBackend::be_deep_scrub(
|
||||
const hobject_t &poid,
|
||||
uint32_t seed,
|
||||
ScrubMap::object &o,
|
||||
ThreadPool::TPHandle &handle,
|
||||
ScrubMap* const map)
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos,
|
||||
ScrubMap::object &o)
|
||||
{
|
||||
dout(10) << __func__ << " " << poid << " seed "
|
||||
<< std::hex << seed << std::dec << dendl;
|
||||
bufferhash h(seed), oh(seed);
|
||||
bufferlist bl, hdrbl;
|
||||
dout(10) << __func__ << " " << poid << " pos " << pos << dendl;
|
||||
int r;
|
||||
__u64 pos = 0;
|
||||
bool skip_data_digest = store->has_builtin_csum() &&
|
||||
g_conf->get_val<bool>("osd_skip_data_digest");
|
||||
utime_t sleeptime;
|
||||
sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
|
||||
|
||||
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
|
||||
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
|
||||
|
||||
while (true) {
|
||||
if (sleeptime != utime_t()) {
|
||||
lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
|
||||
sleeptime.sleep();
|
||||
bool skip_data_digest = store->has_builtin_csum() &&
|
||||
g_conf->get_val<bool>("osd_skip_data_digest");
|
||||
|
||||
utime_t sleeptime;
|
||||
sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep);
|
||||
if (sleeptime != utime_t()) {
|
||||
lgeneric_derr(cct) << __func__ << " sleeping for " << sleeptime << dendl;
|
||||
sleeptime.sleep();
|
||||
}
|
||||
|
||||
assert(poid == pos.ls[pos.pos]);
|
||||
if (!pos.data_done()) {
|
||||
if (pos.data_pos == 0) {
|
||||
pos.data_hash = bufferhash(pos.seed);
|
||||
}
|
||||
handle.reset_tp_timeout();
|
||||
|
||||
bufferlist bl;
|
||||
r = store->read(
|
||||
ch,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
pos,
|
||||
cct->_conf->osd_deep_scrub_stride, bl,
|
||||
fadvise_flags);
|
||||
if (r <= 0)
|
||||
break;
|
||||
|
||||
ch,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
pos.data_pos,
|
||||
cct->_conf->osd_deep_scrub_stride, bl,
|
||||
fadvise_flags);
|
||||
if (r < 0) {
|
||||
dout(20) << __func__ << " " << poid << " got "
|
||||
<< r << " on read, read_error" << dendl;
|
||||
o.read_error = true;
|
||||
return 0;
|
||||
}
|
||||
if (r > 0 && !skip_data_digest) {
|
||||
pos.data_hash << bl;
|
||||
}
|
||||
pos.data_pos += r;
|
||||
if (r == cct->_conf->osd_deep_scrub_stride) {
|
||||
dout(20) << __func__ << " " << poid << " more data, digest so far 0x"
|
||||
<< std::hex << pos.data_hash.digest() << std::dec << dendl;
|
||||
return -EINPROGRESS;
|
||||
}
|
||||
// done with bytes
|
||||
pos.data_pos = -1;
|
||||
if (!skip_data_digest) {
|
||||
h << bl;
|
||||
o.digest = pos.data_hash.digest();
|
||||
o.digest_present = true;
|
||||
}
|
||||
pos += bl.length();
|
||||
bl.clear();
|
||||
}
|
||||
if (r == -EIO) {
|
||||
dout(25) << __func__ << " " << poid << " got "
|
||||
<< r << " on read, read_error" << dendl;
|
||||
o.read_error = true;
|
||||
return;
|
||||
}
|
||||
if (!skip_data_digest) {
|
||||
o.digest = h.digest();
|
||||
o.digest_present = true;
|
||||
dout(20) << __func__ << " " << poid << " done with data, digest 0x"
|
||||
<< std::hex << o.digest << std::dec << dendl;
|
||||
}
|
||||
|
||||
bl.clear();
|
||||
r = store->omap_get_header(
|
||||
coll,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
&hdrbl, true);
|
||||
// NOTE: bobtail to giant, we would crc the head as (len, head).
|
||||
// that changes at the same time we start using a non-zero seed.
|
||||
if (r == 0 && hdrbl.length()) {
|
||||
dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
|
||||
<< dendl;
|
||||
if (seed == 0) {
|
||||
// legacy
|
||||
bufferlist bl;
|
||||
encode(hdrbl, bl);
|
||||
oh << bl;
|
||||
} else {
|
||||
oh << hdrbl;
|
||||
// omap header
|
||||
if (pos.omap_pos.empty()) {
|
||||
pos.omap_hash = bufferhash(pos.seed);
|
||||
|
||||
bufferlist hdrbl;
|
||||
r = store->omap_get_header(
|
||||
coll,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
|
||||
&hdrbl, true);
|
||||
if (r == -EIO) {
|
||||
dout(20) << __func__ << " " << poid << " got "
|
||||
<< r << " on omap header read, read_error" << dendl;
|
||||
o.read_error = true;
|
||||
return 0;
|
||||
}
|
||||
if (r == 0 && hdrbl.length()) {
|
||||
dout(25) << "CRC header " << string(hdrbl.c_str(), hdrbl.length())
|
||||
<< dendl;
|
||||
pos.omap_hash << hdrbl;
|
||||
}
|
||||
} else if (r == -EIO) {
|
||||
dout(25) << __func__ << " " << poid << " got "
|
||||
<< r << " on omap header read, read_error" << dendl;
|
||||
o.read_error = true;
|
||||
return;
|
||||
}
|
||||
|
||||
// omap
|
||||
ObjectMap::ObjectMapIterator iter = store->get_omap_iterator(
|
||||
coll,
|
||||
ghobject_t(
|
||||
poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
|
||||
assert(iter);
|
||||
uint64_t keys_scanned = 0;
|
||||
uint64_t value_sum = 0;
|
||||
for (iter->seek_to_first(); iter->status() == 0 && iter->valid();
|
||||
iter->next(false)) {
|
||||
++keys_scanned;
|
||||
handle.reset_tp_timeout();
|
||||
|
||||
dout(25) << "CRC key " << iter->key() << " value:\n";
|
||||
iter->value().hexdump(*_dout);
|
||||
*_dout << dendl;
|
||||
|
||||
value_sum += iter->value().length();
|
||||
if (pos.omap_pos.length()) {
|
||||
iter->lower_bound(pos.omap_pos);
|
||||
} else {
|
||||
iter->seek_to_first();
|
||||
}
|
||||
int max = g_conf->osd_deep_scrub_keys;
|
||||
while (iter->status() == 0 && iter->valid()) {
|
||||
pos.omap_bytes += iter->value().length();
|
||||
++pos.omap_keys;
|
||||
|
||||
// fixme: we can do this more efficiently.
|
||||
bufferlist bl;
|
||||
encode(iter->key(), bl);
|
||||
encode(iter->value(), bl);
|
||||
oh << bl;
|
||||
bl.clear();
|
||||
pos.omap_hash << bl;
|
||||
|
||||
iter->next();
|
||||
|
||||
if (iter->valid() && max == 0) {
|
||||
pos.omap_pos = iter->key();
|
||||
return -EINPROGRESS;
|
||||
}
|
||||
if (iter->status() < 0) {
|
||||
dout(25) << __func__ << " " << poid
|
||||
<< " on omap scan, db status error" << dendl;
|
||||
o.read_error = true;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (keys_scanned > cct->_conf->get_val<uint64_t>(
|
||||
"osd_deep_scrub_large_omap_object_key_threshold") ||
|
||||
value_sum > cct->_conf->get_val<uint64_t>(
|
||||
"osd_deep_scrub_large_omap_object_value_sum_threshold")) {
|
||||
if (pos.omap_keys > cct->_conf->get_val<uint64_t>(
|
||||
"osd_deep_scrub_large_omap_object_key_threshold") ||
|
||||
pos.omap_bytes > cct->_conf->get_val<uint64_t>(
|
||||
"osd_deep_scrub_large_omap_object_value_sum_threshold")) {
|
||||
dout(25) << __func__ << " " << poid
|
||||
<< " large omap object detected. Object has " << keys_scanned
|
||||
<< " keys and size " << value_sum << " bytes" << dendl;
|
||||
<< " large omap object detected. Object has " << pos.omap_keys
|
||||
<< " keys and size " << pos.omap_bytes << " bytes" << dendl;
|
||||
o.large_omap_object_found = true;
|
||||
o.large_omap_object_key_count = keys_scanned;
|
||||
o.large_omap_object_value_size = value_sum;
|
||||
map->has_large_omap_object_errors = true;
|
||||
o.large_omap_object_key_count = pos.omap_keys;
|
||||
o.large_omap_object_value_size = pos.omap_bytes;
|
||||
map.has_large_omap_object_errors = true;
|
||||
}
|
||||
|
||||
if (iter->status() < 0) {
|
||||
dout(25) << __func__ << " " << poid
|
||||
<< " on omap scan, db status error" << dendl;
|
||||
o.read_error = true;
|
||||
return;
|
||||
}
|
||||
|
||||
//Store final calculated CRC32 of omap header & key/values
|
||||
o.omap_digest = oh.digest();
|
||||
o.omap_digest = pos.omap_hash.digest();
|
||||
o.omap_digest_present = true;
|
||||
dout(20) << __func__ << " " << poid << " omap_digest "
|
||||
dout(20) << __func__ << " done with " << poid << " omap_digest "
|
||||
<< std::hex << o.omap_digest << std::dec << dendl;
|
||||
|
||||
// done!
|
||||
return 0;
|
||||
}
|
||||
|
||||
void ReplicatedBackend::_do_push(OpRequestRef op)
|
||||
|
@ -428,12 +428,11 @@ private:
|
||||
bool auto_repair_supported() const override { return false; }
|
||||
|
||||
|
||||
void be_deep_scrub(
|
||||
const hobject_t &obj,
|
||||
uint32_t seed,
|
||||
ScrubMap::object &o,
|
||||
ThreadPool::TPHandle &handle,
|
||||
ScrubMap* const map = nullptr) override;
|
||||
int be_deep_scrub(
|
||||
const hobject_t &poid,
|
||||
ScrubMap &map,
|
||||
ScrubMapBuilder &pos,
|
||||
ScrubMap::object &o) override;
|
||||
uint64_t be_get_ondisk_size(uint64_t logical_size) override { return logical_size; }
|
||||
};
|
||||
|
||||
|
@ -4960,6 +4960,61 @@ struct ScrubMap {
|
||||
WRITE_CLASS_ENCODER(ScrubMap::object)
|
||||
WRITE_CLASS_ENCODER(ScrubMap)
|
||||
|
||||
struct ScrubMapBuilder {
|
||||
bool deep = false;
|
||||
uint32_t seed = 0;
|
||||
vector<hobject_t> ls;
|
||||
size_t pos = 0;
|
||||
int64_t data_pos = 0;
|
||||
string omap_pos;
|
||||
int ret = 0;
|
||||
bufferhash data_hash, omap_hash; ///< accumulatinng hash value
|
||||
uint64_t omap_keys = 0;
|
||||
uint64_t omap_bytes = 0;
|
||||
|
||||
bool empty() {
|
||||
return ls.empty();
|
||||
}
|
||||
bool done() {
|
||||
return pos >= ls.size();
|
||||
}
|
||||
void reset() {
|
||||
*this = ScrubMapBuilder();
|
||||
}
|
||||
|
||||
bool data_done() {
|
||||
return data_pos < 0;
|
||||
}
|
||||
|
||||
void next_object() {
|
||||
++pos;
|
||||
data_pos = 0;
|
||||
omap_pos.clear();
|
||||
omap_keys = 0;
|
||||
omap_bytes = 0;
|
||||
}
|
||||
|
||||
friend ostream& operator<<(ostream& out, const ScrubMapBuilder& pos) {
|
||||
out << "(" << pos.pos << "/" << pos.ls.size();
|
||||
if (pos.pos < pos.ls.size()) {
|
||||
out << " " << pos.ls[pos.pos];
|
||||
}
|
||||
if (pos.data_pos < 0) {
|
||||
out << " byte " << pos.data_pos;
|
||||
}
|
||||
if (!pos.omap_pos.empty()) {
|
||||
out << " key " << pos.omap_pos;
|
||||
}
|
||||
if (pos.deep) {
|
||||
out << " deep";
|
||||
}
|
||||
if (pos.ret) {
|
||||
out << " ret " << pos.ret;
|
||||
}
|
||||
return out << ")";
|
||||
}
|
||||
};
|
||||
|
||||
struct OSDOp {
|
||||
ceph_osd_op op;
|
||||
sobject_t soid;
|
||||
|
Loading…
Reference in New Issue
Block a user