1
0
mirror of https://github.com/ceph/ceph synced 2025-01-10 21:20:46 +00:00

Merge pull request from athanatos/wip-17831

osd/ReplicatedBackend: take read locks for clone sources during recovery

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
Samuel Just 2017-01-14 18:48:00 -08:00 committed by GitHub
commit f634eb3781
5 changed files with 175 additions and 69 deletions

View File

@ -188,6 +188,12 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
const hobject_t &hoid,
map<string, bufferlist> &attrs) = 0;
virtual bool try_lock_for_read(
const hobject_t &hoid,
ObcLockManager &manager) = 0;
virtual void release_locks(ObcLockManager &manager) = 0;
virtual void op_applied(
const eversion_t &applied_version) = 0;

View File

@ -335,11 +335,26 @@ public:
const pg_pool_t &get_pool() const override {
return pool.info;
}
ObjectContextRef get_obc(
const hobject_t &hoid,
map<string, bufferlist> &attrs) override {
return get_object_context(hoid, true, &attrs);
}
bool try_lock_for_read(
const hobject_t &hoid,
ObcLockManager &manager) override {
auto obc = get_object_context(hoid, false, nullptr);
if (!obc)
return false;
return manager.try_get_read_lock(hoid, obc);
}
void release_locks(ObcLockManager &manager) {
release_object_locks(manager);
}
void pgb_set_object_snap_mapping(
const hobject_t &soid,
const set<snapid_t> &snaps,
@ -352,7 +367,6 @@ public:
return clear_object_snap_mapping(t, soid);
}
void log_operation(
const vector<pg_log_entry_t> &logv,
boost::optional<pg_hit_set_history_t> &hset_history,

View File

@ -166,9 +166,8 @@ void ReplicatedBackend::check_recovery_sources(const OSDMapRef& osdmap)
for (set<hobject_t, hobject_t::BitwiseComparator>::iterator j = i->second.begin();
j != i->second.end();
++j) {
assert(pulling.count(*j) == 1);
get_parent()->cancel_pull(*j);
pulling.erase(*j);
clear_pull(pulling.find(*j), false);
}
pull_from_peer.erase(i++);
} else {
@ -274,7 +273,16 @@ bool ReplicatedBackend::handle_message(
void ReplicatedBackend::clear_recovery_state()
{
// clear pushing/pulling maps
for (auto &&i: pushing) {
for (auto &&j: i.second) {
get_parent()->release_locks(j.second.lock_manager);
}
}
pushing.clear();
for (auto &&i: pulling) {
get_parent()->release_locks(i.second.lock_manager);
}
pulling.clear();
pull_from_peer.clear();
}
@ -859,25 +867,18 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
ReplicatedBackend *bc;
list<hobject_t> to_continue;
list<ReplicatedBackend::pull_complete_info> to_continue;
int priority;
C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
: bc(bc), priority(priority) {}
void finish(ThreadPool::TPHandle &handle) {
ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
for (list<hobject_t>::iterator i =
to_continue.begin();
i != to_continue.end();
++i) {
map<hobject_t, ReplicatedBackend::PullInfo, hobject_t::BitwiseComparator>::iterator j =
bc->pulling.find(*i);
assert(j != bc->pulling.end());
if (!bc->start_pushes(*i, j->second.obc, h)) {
for (auto &&i: to_continue) {
if (!bc->start_pushes(i.hoid, i.obc, h)) {
bc->get_parent()->on_global_recover(
*i, j->second.stat);
i.hoid, i.stat);
}
bc->pulling.erase(*i);
handle.reset_tp_timeout();
}
bc->run_recovery_op(h, priority);
@ -894,7 +895,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
vector<PullOp> replies(1);
ObjectStore::Transaction t;
list<hobject_t> to_continue;
list<pull_complete_info> to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
@ -1255,7 +1256,8 @@ void ReplicatedBackend::calc_head_subsets(
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
ObcLockManager &manager)
{
dout(10) << "calc_head_subsets " << head
<< " clone_overlap " << snapset.clone_overlap << dendl;
@ -1285,7 +1287,8 @@ void ReplicatedBackend::calc_head_subsets(
c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
if (!missing.is_missing(c) &&
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_head_subsets " << head << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
@ -1299,6 +1302,7 @@ void ReplicatedBackend::calc_head_subsets(
if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
get_parent()->release_locks(manager);
clone_subsets.clear();
cloning.clear();
}
@ -1316,7 +1320,8 @@ void ReplicatedBackend::calc_clone_subsets(
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
ObcLockManager &manager)
{
dout(10) << "calc_clone_subsets " << soid
<< " clone_overlap " << snapset.clone_overlap << dendl;
@ -1350,7 +1355,8 @@ void ReplicatedBackend::calc_clone_subsets(
c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
if (!missing.is_missing(c) &&
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_clone_subsets " << soid << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
@ -1370,7 +1376,8 @@ void ReplicatedBackend::calc_clone_subsets(
c.snap = snapset.clones[j];
next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
if (!missing.is_missing(c) &&
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_clone_subsets " << soid << " has next " << c
<< " overlap " << next << dendl;
clone_subsets[c] = next;
@ -1383,6 +1390,7 @@ void ReplicatedBackend::calc_clone_subsets(
if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
get_parent()->release_locks(manager);
clone_subsets.clear();
cloning.clear();
}
@ -1444,6 +1452,7 @@ void ReplicatedBackend::prepare_pull(
}
ObjectRecoveryInfo recovery_info;
ObcLockManager lock_manager;
if (soid.is_snap()) {
assert(!get_parent()->get_local_missing().is_missing(
@ -1455,10 +1464,12 @@ void ReplicatedBackend::prepare_pull(
SnapSetContext *ssc = headctx->ssc;
assert(ssc);
dout(10) << " snapset " << ssc->snapset << dendl;
calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
get_info().last_backfill,
recovery_info.copy_subset,
recovery_info.clone_subset);
calc_clone_subsets(
ssc->snapset, soid, get_parent()->get_local_missing(),
get_info().last_backfill,
recovery_info.copy_subset,
recovery_info.clone_subset,
lock_manager);
// FIXME: this may overestimate if we are pulling multiple clones in parallel...
dout(10) << " pulling " << recovery_info << dendl;
@ -1486,10 +1497,13 @@ void ReplicatedBackend::prepare_pull(
assert(!pulling.count(soid));
pull_from_peer[fromshard].insert(soid);
PullInfo &pi = pulling[soid];
pi.from = fromshard;
pi.soid = soid;
pi.head_ctx = headctx;
pi.recovery_info = op.recovery_info;
pi.recovery_progress = op.recovery_progress;
pi.cache_dont_need = h->cache_dont_need;
pi.lock_manager = std::move(lock_manager);
}
/*
@ -1509,6 +1523,7 @@ void ReplicatedBackend::prep_push_to_replica(
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator> clone_subsets;
interval_set<uint64_t> data_subset;
ObcLockManager lock_manager;
// are we doing a clone on the replica?
if (soid.snap && soid.snap < CEPH_NOSNAP) {
hobject_t head = soid;
@ -1537,10 +1552,12 @@ void ReplicatedBackend::prep_push_to_replica(
map<pg_shard_t, pg_info_t>::const_iterator pi =
get_parent()->get_shard_info().find(peer);
assert(pi != get_parent()->get_shard_info().end());
calc_clone_subsets(ssc->snapset, soid,
pm->second,
pi->second.last_backfill,
data_subset, clone_subsets);
calc_clone_subsets(
ssc->snapset, soid,
pm->second,
pi->second.last_backfill,
data_subset, clone_subsets,
lock_manager);
} else if (soid.snap == CEPH_NOSNAP) {
// pushing head or unversioned object.
// base this on partially on replica's clones?
@ -1551,10 +1568,20 @@ void ReplicatedBackend::prep_push_to_replica(
obc,
ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
get_parent()->get_shard_info().find(peer)->second.last_backfill,
data_subset, clone_subsets);
data_subset, clone_subsets,
lock_manager);
}
prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop, cache_dont_need);
prep_push(
obc,
soid,
peer,
oi.version,
data_subset,
clone_subsets,
pop,
cache_dont_need,
std::move(lock_manager));
}
void ReplicatedBackend::prep_push(ObjectContextRef obc,
@ -1568,7 +1595,7 @@ void ReplicatedBackend::prep_push(ObjectContextRef obc,
prep_push(obc, soid, peer,
obc->obs.oi.version, data_subset, clone_subsets,
pop, cache_dont_need);
pop, cache_dont_need, ObcLockManager());
}
void ReplicatedBackend::prep_push(
@ -1578,7 +1605,8 @@ void ReplicatedBackend::prep_push(
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *pop,
bool cache_dont_need)
bool cache_dont_need,
ObcLockManager &&lock_manager)
{
get_parent()->begin_peer_recover(peer, soid);
// take note.
@ -1594,6 +1622,7 @@ void ReplicatedBackend::prep_push(
pi.recovery_progress.data_recovered_to = 0;
pi.recovery_progress.data_complete = 0;
pi.recovery_progress.omap_complete = 0;
pi.lock_manager = std::move(lock_manager);
ObjectRecoveryProgress new_progress;
int r = build_push_op(pi.recovery_info,
@ -1732,7 +1761,8 @@ void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
const ObjectRecoveryInfo& recovery_info,
SnapSetContext *ssc)
SnapSetContext *ssc,
ObcLockManager &manager)
{
if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
return recovery_info;
@ -1740,17 +1770,19 @@ ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
new_info.copy_subset.clear();
new_info.clone_subset.clear();
assert(ssc);
calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
get_info().last_backfill,
new_info.copy_subset, new_info.clone_subset);
get_parent()->release_locks(manager); // might already have locks
calc_clone_subsets(
ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
get_info().last_backfill,
new_info.copy_subset, new_info.clone_subset,
manager);
return new_info;
}
bool ReplicatedBackend::handle_pull_response(
pg_shard_t from, PushOp &pop, PullOp *response,
list<hobject_t> *to_continue,
ObjectStore::Transaction *t
)
list<pull_complete_info> *to_continue,
ObjectStore::Transaction *t)
{
interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
@ -1795,7 +1827,10 @@ bool ReplicatedBackend::handle_pull_response(
}
pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
pi.recovery_info.oi = pi.obc->obs.oi;
pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
pi.recovery_info = recalc_subsets(
pi.recovery_info,
pi.obc->ssc,
pi.lock_manager);
}
@ -1831,12 +1866,10 @@ bool ReplicatedBackend::handle_pull_response(
if (complete) {
pi.stat.num_objects_recovered++;
to_continue->push_back(hoid);
to_continue->push_back({hoid, pi.obc, pi.stat});
get_parent()->on_local_recover(
hoid, pi.recovery_info, pi.obc, t);
pull_from_peer[from].erase(hoid);
if (pull_from_peer[from].empty())
pull_from_peer.erase(from);
clear_pull(pulling.find(hoid));
return false;
} else {
response->soid = pop.soid;
@ -2183,6 +2216,7 @@ bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, Push
stat.num_keys_recovered = reply->omap_entries.size();
stat.num_objects_recovered = 1;
get_parent()->release_locks(pi->lock_manager);
pushing[soid].erase(peer);
pi = NULL;
@ -2329,7 +2363,7 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
if (is_primary()) {
PullOp resp;
RPGHandle *h = _open_recovery_op();
list<hobject_t> to_continue;
list<pull_complete_info> to_continue;
bool more = handle_pull_response(
m->from, pop, &resp,
&to_continue, &t);
@ -2370,10 +2404,22 @@ void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
{
list<pg_shard_t> fl = { from };
get_parent()->failed_push(fl, soid);
pull_from_peer[from].erase(soid);
if (pull_from_peer[from].empty())
pull_from_peer.erase(from);
pulling.erase(soid);
clear_pull(pulling.find(soid));
}
void ReplicatedBackend::clear_pull(
map<hobject_t, PullInfo, hobject_t::BitwiseComparator>::iterator piter,
bool clear_pull_from_peer)
{
auto from = piter->second.from;
if (clear_pull_from_peer) {
pull_from_peer[from].erase(piter->second.soid);
if (pull_from_peer[from].empty())
pull_from_peer.erase(from);
}
get_parent()->release_locks(piter->second.lock_manager);
pulling.erase(piter);
}
int ReplicatedBackend::start_pushes(

View File

@ -167,6 +167,7 @@ private:
ObjectRecoveryInfo recovery_info;
ObjectContextRef obc;
object_stat_sum_t stat;
ObcLockManager lock_manager;
void dump(Formatter *f) const {
{
@ -185,12 +186,15 @@ private:
// pull
struct PullInfo {
pg_shard_t from;
hobject_t soid;
ObjectRecoveryProgress recovery_progress;
ObjectRecoveryInfo recovery_info;
ObjectContextRef head_ctx;
ObjectContextRef obc;
object_stat_sum_t stat;
bool cache_dont_need;
ObcLockManager lock_manager;
void dump(Formatter *f) const {
{
@ -214,6 +218,9 @@ private:
// Reverse mapping from osd peer to objects beging pulled from that peer
map<pg_shard_t, set<hobject_t, hobject_t::BitwiseComparator> > pull_from_peer;
void clear_pull(
map<hobject_t, PullInfo, hobject_t::BitwiseComparator>::iterator piter,
bool clear_pull_from_peer = true);
void sub_op_push(OpRequestRef op);
void sub_op_push_reply(OpRequestRef op);
@ -233,9 +240,15 @@ private:
bool handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply);
void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
struct pull_complete_info {
hobject_t hoid;
ObjectContextRef obc;
object_stat_sum_t stat;
};
bool handle_pull_response(
pg_shard_t from, PushOp &op, PullOp *response,
list<hobject_t> *to_continue,
list<pull_complete_info> *to_continue,
ObjectStore::Transaction *t);
void handle_push(pg_shard_t from, PushOp &op, PushReplyOp *response,
ObjectStore::Transaction *t);
@ -281,7 +294,8 @@ private:
SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets);
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
ObcLockManager &lock_manager);
void prepare_pull(
eversion_t v,
const hobject_t& soid,
@ -294,26 +308,31 @@ private:
void prep_push_to_replica(
ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
PushOp *pop, bool cache_dont_need = true);
void prep_push(ObjectContextRef obc,
const hobject_t& oid, pg_shard_t dest,
PushOp *op,
bool cache_dont_need);
void prep_push(ObjectContextRef obc,
const hobject_t& soid, pg_shard_t peer,
eversion_t version,
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *op,
bool cache = false);
void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets);
void prep_push(
ObjectContextRef obc,
const hobject_t& oid, pg_shard_t dest,
PushOp *op,
bool cache_dont_need);
void prep_push(
ObjectContextRef obc,
const hobject_t& soid, pg_shard_t peer,
eversion_t version,
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *op,
bool cache,
ObcLockManager &&lock_manager);
void calc_head_subsets(
ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
ObcLockManager &lock_manager);
ObjectRecoveryInfo recalc_subsets(
const ObjectRecoveryInfo& recovery_info,
SnapSetContext *ssc
);
SnapSetContext *ssc,
ObcLockManager &lock_manager);
/**
* Client IO

View File

@ -4345,6 +4345,9 @@ public:
}
return false;
}
bool try_get_read_lock() {
return rwstate.get_read_lock();
}
void drop_recovery_read(list<OpRequestRef> *ls) {
assert(rwstate.recovery_read_marker);
rwstate.put_read(ls);
@ -4504,6 +4507,7 @@ public:
ObcLockManager() = default;
ObcLockManager(ObcLockManager &&) = default;
ObcLockManager(const ObcLockManager &) = delete;
ObcLockManager &operator=(ObcLockManager &&) = default;
bool empty() const {
return locks.empty();
}
@ -4564,6 +4568,23 @@ public:
return false;
}
}
/// try get read lock
bool try_get_read_lock(
const hobject_t &hoid,
ObjectContextRef obc) {
assert(locks.find(hoid) == locks.end());
if (obc->try_get_read_lock()) {
locks.insert(
make_pair(
hoid,
ObjectLockState(obc, ObjectContext::RWState::RWREAD)));
return true;
} else {
return false;
}
}
void put_locks(
list<pair<hobject_t, list<OpRequestRef> > > *to_requeue,
bool *requeue_recovery,