Merge PR #29517 into master

* refs/pull/29517/head:
	osd: include PastInterals in pg_notify_t

Reviewed-by: Samuel Just <sjust@redhat.com>
This commit is contained in:
Sage Weil 2019-08-08 15:49:17 -05:00
commit 01fcf0e401
10 changed files with 172 additions and 145 deletions

View File

@ -116,20 +116,18 @@ std::vector<OperationRef> handle_pg_notify(
std::vector<OperationRef> ret;
ret.reserve(m->get_pg_list().size());
const int from = m->get_source().num();
for (auto &p : m->get_pg_list()) {
auto& [pg_notify, past_intervals] = p;
for (auto& pg_notify : m->get_pg_list()) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
MNotifyRec notify{pgid,
pg_shard_t{from, pg_notify.from},
pg_notify,
0, // the features is not used
past_intervals};
0}; // the features is not used
logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
auto create_info = new PGCreateInfo{
pgid,
pg_notify.query_epoch,
pg_notify.info.history,
past_intervals,
pg_notify.past_intervals,
false};
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
state,
@ -157,8 +155,7 @@ std::vector<OperationRef> handle_pg_info(
std::vector<OperationRef> ret;
ret.reserve(m->pg_list.size());
const int from = m->get_source().num();
for (auto &p : m->pg_list) {
auto& pg_notify = p.first;
for (auto& pg_notify : m->pg_list) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
MInfoRec info{pg_shard_t{from, pg_notify.from},
@ -193,8 +190,8 @@ public:
from.shard, pgid.shard,
evt.get_epoch_sent(),
osd.get_shard_services().get_osdmap()->get_epoch(),
empty),
PastIntervals());
empty,
PastIntervals()));
}
};

View File

@ -21,13 +21,13 @@
class MOSDPGInfo : public Message {
private:
static constexpr int HEAD_VERSION = 5;
static constexpr int HEAD_VERSION = 6;
static constexpr int COMPAT_VERSION = 5;
epoch_t epoch = 0;
public:
using pg_list_t = std::vector<std::pair<pg_notify_t,PastIntervals>>;
using pg_list_t = std::vector<pg_notify_t>;
pg_list_t pg_list;
epoch_t get_epoch() const { return epoch; }
@ -57,7 +57,7 @@ public:
++i) {
if (i != pg_list.begin())
out << " ";
out << i->first << "=" << i->second;
out << *i;
}
out << " epoch " << epoch
<< ")";
@ -65,12 +65,34 @@ public:
void encode_payload(uint64_t features) override {
using ceph::encode;
header.version = HEAD_VERSION;
encode(epoch, payload);
if (!HAVE_FEATURE(features, SERVER_OCTOPUS)) {
// pretend to be vector<pair<pg_notify_t,PastIntervals>>
header.version = 5;
encode((uint32_t)pg_list.size(), payload);
for (auto& i : pg_list) {
encode(i, payload); // this embeds a dup (ignored) PastIntervals
encode(i.past_intervals, payload);
}
return;
}
encode(pg_list, payload);
}
void decode_payload() override {
auto p = payload.cbegin();
decode(epoch, p);
if (header.version == 5) {
// decode legacy vector<pair<pg_notify_t,PastIntervals>>
uint32_t num;
decode(num, p);
pg_list.resize(num);
for (unsigned i = 0; i < num; ++i) {
decode(pg_list[i], p);
decode(pg_list[i].past_intervals, p);
}
return;
}
decode(pg_list, p);
}
private:

View File

@ -25,7 +25,7 @@
class MOSDPGNotify : public Message {
private:
static constexpr int HEAD_VERSION = 6;
static constexpr int HEAD_VERSION = 7;
static constexpr int COMPAT_VERSION = 6;
epoch_t epoch = 0;
@ -33,8 +33,8 @@ private:
/// the current epoch if this is not being sent in response to a
/// query. This allows the recipient to disregard responses to old
/// queries.
using pg_list_t = std::vector<std::pair<pg_notify_t,PastIntervals>>;
pg_list_t pg_list; // pgid -> version
using pg_list_t = std::vector<pg_notify_t>;
pg_list_t pg_list;
public:
version_t get_epoch() const { return epoch; }
@ -59,13 +59,35 @@ public:
void encode_payload(uint64_t features) override {
using ceph::encode;
header.version = HEAD_VERSION;
encode(epoch, payload);
if (!HAVE_FEATURE(features, SERVER_OCTOPUS)) {
// pretend to be vector<pair<pg_notify_t,PastIntervals>>
header.version = 6;
encode((uint32_t)pg_list.size(), payload);
for (auto& i : pg_list) {
encode(i, payload); // this embeds a dup (ignored) PastIntervals
encode(i.past_intervals, payload);
}
return;
}
encode(pg_list, payload);
}
void decode_payload() override {
auto p = payload.cbegin();
decode(epoch, p);
if (header.version == 6) {
// decode legacy vector<pair<pg_notify_t,PastIntervals>>
uint32_t num;
decode(num, p);
pg_list.resize(num);
for (unsigned i = 0; i < num; ++i) {
decode(pg_list[i], p);
decode(pg_list[i].past_intervals, p);
}
return;
}
decode(pg_list, p);
}
void print(ostream& out) const override {
@ -75,7 +97,7 @@ public:
++i) {
if (i != pg_list.begin())
out << " ";
out << i->first << "=" << i->second;
out << *i;
}
out << " epoch " << epoch
<< ")";

View File

@ -8985,30 +8985,25 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
*/
void OSD::do_notifies(
map<int,vector<pair<pg_notify_t,PastIntervals> > >& notify_list,
map<int,vector<pg_notify_t>>& notify_list,
OSDMapRef curmap)
{
for (map<int,
vector<pair<pg_notify_t,PastIntervals> > >::iterator it =
notify_list.begin();
it != notify_list.end();
++it) {
if (!curmap->is_up(it->first)) {
dout(20) << __func__ << " skipping down osd." << it->first << dendl;
for (auto& [osd, notifies] : notify_list) {
if (!curmap->is_up(osd)) {
dout(20) << __func__ << " skipping down osd." << osd << dendl;
continue;
}
ConnectionRef con = service.get_con_osd_cluster(
it->first, curmap->get_epoch());
osd, curmap->get_epoch());
if (!con) {
dout(20) << __func__ << " skipping osd." << it->first
<< " (NULL con)" << dendl;
dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
continue;
}
service.maybe_share_map(con.get(), curmap);
dout(7) << __func__ << " osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
dout(7) << __func__ << " osd." << osd
<< " on " << notifies.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
std::move(it->second));
std::move(notifies));
con->send_message(m);
}
}
@ -9044,35 +9039,27 @@ void OSD::do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
}
void OSD::do_infos(map<int,
vector<pair<pg_notify_t, PastIntervals> > >& info_map,
void OSD::do_infos(map<int,vector<pg_notify_t>>& info_map,
OSDMapRef curmap)
{
for (map<int,
vector<pair<pg_notify_t, PastIntervals> > >::iterator p =
info_map.begin();
p != info_map.end();
++p) {
if (!curmap->is_up(p->first)) {
dout(20) << __func__ << " skipping down osd." << p->first << dendl;
for (auto& [osd, notifies] : info_map) {
if (!curmap->is_up(osd)) {
dout(20) << __func__ << " skipping down osd." << osd << dendl;
continue;
}
for (vector<pair<pg_notify_t,PastIntervals> >::iterator i = p->second.begin();
i != p->second.end();
++i) {
dout(20) << __func__ << " sending info " << i->first.info
<< " to shard " << p->first << dendl;
for (auto& i : notifies) {
dout(20) << __func__ << " sending info " << i.info
<< " to osd " << osd << dendl;
}
ConnectionRef con = service.get_con_osd_cluster(
p->first, curmap->get_epoch());
osd, curmap->get_epoch());
if (!con) {
dout(20) << __func__ << " skipping osd." << p->first
<< " (NULL con)" << dendl;
dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
continue;
}
service.maybe_share_map(con.get(), curmap);
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
m->pg_list = std::move(notifies);
con->send_message(m);
}
info_map.clear();
@ -9184,24 +9171,23 @@ void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
}
int from = m->get_source().num();
for (auto& p : m->get_pg_list()) {
spg_t pgid(p.first.info.pgid.pgid, p.first.to);
spg_t pgid(p.info.pgid.pgid, p.to);
enqueue_peering_evt(
pgid,
PGPeeringEventRef(
std::make_shared<PGPeeringEvent>(
p.first.epoch_sent,
p.first.query_epoch,
p.epoch_sent,
p.query_epoch,
MNotifyRec(
pgid, pg_shard_t(from, p.first.from),
p.first,
m->get_connection()->get_features(),
p.second),
pgid, pg_shard_t(from, p.from),
p,
m->get_connection()->get_features()),
true,
new PGCreateInfo(
pgid,
p.first.query_epoch,
p.first.info.history,
p.second,
p.query_epoch,
p.info.history,
p.past_intervals,
false)
)));
}
@ -9218,14 +9204,14 @@ void OSD::handle_fast_pg_info(MOSDPGInfo* m)
int from = m->get_source().num();
for (auto& p : m->pg_list) {
enqueue_peering_evt(
spg_t(p.first.info.pgid.pgid, p.first.to),
spg_t(p.info.pgid.pgid, p.to),
PGPeeringEventRef(
std::make_shared<PGPeeringEvent>(
p.first.epoch_sent, p.first.query_epoch,
p.epoch_sent, p.query_epoch,
MInfoRec(
pg_shard_t(from, p.first.from),
p.first.info,
p.first.epoch_sent)))
pg_shard_t(from, p.from),
p.info,
p.epoch_sent)))
);
}
m->put();
@ -9316,14 +9302,13 @@ void OSD::handle_pg_query_nopg(const MQuery& q)
osdmap->get_epoch(), empty,
q.query.epoch_sent);
} else {
vector<pair<pg_notify_t,PastIntervals>> ls;
vector<pg_notify_t> ls;
ls.push_back(
make_pair(
pg_notify_t(
q.query.from, q.query.to,
q.query.epoch_sent,
osdmap->get_epoch(),
empty),
pg_notify_t(
q.query.from, q.query.to,
q.query.epoch_sent,
osdmap->get_epoch(),
empty,
PastIntervals()));
m = new MOSDPGNotify(osdmap->get_epoch(), std::move(ls));
}

View File

@ -1897,14 +1897,11 @@ protected:
void dispatch_context_transaction(PeeringCtx &ctx, PG *pg,
ThreadPool::TPHandle *handle = NULL);
void discard_context(PeeringCtx &ctx);
void do_notifies(map<int,
vector<pair<pg_notify_t, PastIntervals> > >&
notify_list,
void do_notifies(map<int,vector<pg_notify_t>>& notify_list,
OSDMapRef map);
void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
OSDMapRef map);
void do_infos(map<int,
vector<pair<pg_notify_t, PastIntervals> > >& info_map,
void do_infos(map<int,vector<pg_notify_t>>& info_map,
OSDMapRef map);
bool require_mon_peer(const Message *m);

View File

@ -94,14 +94,11 @@ struct MNotifyRec : boost::statechart::event< MNotifyRec > {
pg_shard_t from;
pg_notify_t notify;
uint64_t features;
PastIntervals past_intervals;
MNotifyRec(spg_t p, pg_shard_t from, const pg_notify_t &notify, uint64_t f,
const PastIntervals& pi)
: pgid(p), from(from), notify(notify), features(f), past_intervals(pi) {}
MNotifyRec(spg_t p, pg_shard_t from, const pg_notify_t &notify, uint64_t f)
: pgid(p), from(from), notify(notify), features(f) {}
void print(std::ostream *out) const {
*out << "MNotifyRec " << pgid << " from " << from << " notify: " << notify
<< " features: 0x" << std::hex << features << std::dec
<< " " << past_intervals;
<< " features: 0x" << std::hex << features << std::dec;
}
};

View File

@ -1960,8 +1960,7 @@ bool PeeringState::search_for_missing(
from.shard, pg_whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
tinfo),
past_intervals);
tinfo, past_intervals));
}
return found_missing;
}
@ -2059,9 +2058,7 @@ void PeeringState::activate(
ObjectStore::Transaction& t,
epoch_t activation_epoch,
map<int, map<spg_t,pg_query_t> >& query_map,
map<int,
vector<
pair<pg_notify_t, PastIntervals> > > *activator_map,
map<int,vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx)
{
ceph_assert(!is_peered());
@ -2186,8 +2183,8 @@ void PeeringState::activate(
peer.shard, pg_whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
info),
past_intervals);
info,
past_intervals));
} else {
psdout(10) << "activate peer osd." << peer
<< " is up to date, but sending pg_log anyway" << dendl;
@ -2399,8 +2396,8 @@ void PeeringState::share_pg_info()
pg_shard.shard, pg_whoami.shard,
get_osdmap_epoch(),
get_osdmap_epoch(),
info),
past_intervals);
info,
past_intervals));
pl->send_cluster_message(pg_shard.osd, m, get_osdmap_epoch());
}
}
@ -2567,8 +2564,8 @@ void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx)
notify_info.first.shard, pg_whoami.shard,
query.query_epoch,
get_osdmap_epoch(),
notify_info.second),
past_intervals);
notify_info.second,
past_intervals));
} else {
update_history(query.query.history);
fulfill_log(query.from, query.query, query.query_epoch);
@ -4105,8 +4102,8 @@ boost::statechart::result PeeringState::Reset::react(const ActMap&)
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
ps->info),
ps->past_intervals);
ps->info,
ps->past_intervals));
}
ps->update_heartbeat_peers();
@ -5631,7 +5628,8 @@ boost::statechart::result PeeringState::ReplicaActive::react(
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
ps->info);
ps->info,
PastIntervals());
i.info.history.last_epoch_started = evt.activation_epoch;
i.info.history.last_interval_started = i.info.history.same_interval_since;
@ -5641,7 +5639,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(
ps->state_set(PG_STATE_PEERED);
}
m->pg_list.emplace_back(i, PastIntervals());
m->pg_list.emplace_back(i);
pl->send_cluster_message(
ps->get_primary().osd,
m,
@ -5689,8 +5687,8 @@ boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&)
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
ps->info),
ps->past_intervals);
ps->info,
ps->past_intervals));
}
return discard_event();
}
@ -5809,8 +5807,8 @@ boost::statechart::result PeeringState::Stray::react(const ActMap&)
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
ps->info),
ps->past_intervals);
ps->info,
ps->past_intervals));
}
return discard_event();
}

View File

@ -52,8 +52,8 @@ class PeeringCtx;
// [primary only] content recovery state
struct BufferedRecoveryMessages {
map<int, map<spg_t, pg_query_t> > query_map;
map<int, vector<pair<pg_notify_t, PastIntervals> > > info_map;
map<int, vector<pair<pg_notify_t, PastIntervals> > > notify_list;
map<int, vector<pg_notify_t>> info_map;
map<int, vector<pg_notify_t>> notify_list;
BufferedRecoveryMessages() = default;
BufferedRecoveryMessages(PeeringCtx &);
@ -371,8 +371,8 @@ private:
struct PeeringCtxWrapper {
utime_t start_time;
map<int, map<spg_t, pg_query_t> > &query_map;
map<int, vector<pair<pg_notify_t, PastIntervals> > > &info_map;
map<int, vector<pair<pg_notify_t, PastIntervals> > > &notify_list;
map<int, vector<pg_notify_t>> &info_map;
map<int, vector<pg_notify_t>> &notify_list;
ObjectStore::Transaction &transaction;
HBHandle * const handle = nullptr;
@ -392,9 +392,8 @@ private:
PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
void send_notify(pg_shard_t to,
const pg_notify_t &info, const PastIntervals &pi) {
notify_list[to.osd].emplace_back(info, pi);
void send_notify(pg_shard_t to, const pg_notify_t &n) {
notify_list[to.osd].emplace_back(n);
}
};
public:
@ -570,7 +569,7 @@ public:
return state->rctx->query_map;
}
map<int, vector<pair<pg_notify_t, PastIntervals> > > &get_info_map() {
map<int, vector<pg_notify_t>> &get_info_map() {
ceph_assert(state->rctx);
return state->rctx->info_map;
}
@ -580,10 +579,9 @@ public:
return *(state->rctx);
}
void send_notify(pg_shard_t to,
const pg_notify_t &info, const PastIntervals &pi) {
void send_notify(pg_shard_t to, const pg_notify_t &n) {
ceph_assert(state->rctx);
state->rctx->send_notify(to, info, pi);
state->rctx->send_notify(to, n);
}
};
friend class PeeringMachine;
@ -1512,7 +1510,7 @@ public:
ObjectStore::Transaction& t,
epoch_t activation_epoch,
map<int, map<spg_t,pg_query_t> >& query_map,
map<int, vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
map<int, vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx);
void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);

View File

@ -3344,23 +3344,27 @@ void pg_info_t::generate_test_instances(list<pg_info_t*>& o)
// -- pg_notify_t --
void pg_notify_t::encode(ceph::buffer::list &bl) const
{
ENCODE_START(2, 2, bl);
ENCODE_START(3, 2, bl);
encode(query_epoch, bl);
encode(epoch_sent, bl);
encode(info, bl);
encode(to, bl);
encode(from, bl);
encode(past_intervals, bl);
ENCODE_FINISH(bl);
}
void pg_notify_t::decode(ceph::buffer::list::const_iterator &bl)
{
DECODE_START(2, bl);
DECODE_START(3, bl);
decode(query_epoch, bl);
decode(epoch_sent, bl);
decode(info, bl);
decode(to, bl);
decode(from, bl);
if (struct_v >= 3) {
decode(past_intervals, bl);
}
DECODE_FINISH(bl);
}
@ -3375,12 +3379,15 @@ void pg_notify_t::dump(Formatter *f) const
info.dump(f);
f->close_section();
}
f->dump_object("past_intervals", past_intervals);
}
void pg_notify_t::generate_test_instances(list<pg_notify_t*>& o)
{
o.push_back(new pg_notify_t(shard_id_t(3), shard_id_t::NO_SHARD, 1, 1, pg_info_t()));
o.push_back(new pg_notify_t(shard_id_t(0), shard_id_t(0), 3, 10, pg_info_t()));
o.push_back(new pg_notify_t(shard_id_t(3), shard_id_t::NO_SHARD, 1, 1,
pg_info_t(), PastIntervals()));
o.push_back(new pg_notify_t(shard_id_t(0), shard_id_t(0), 3, 10,
pg_info_t(), PastIntervals()));
}
ostream &operator<<(ostream &lhs, const pg_notify_t &notify)
@ -3392,6 +3399,7 @@ ostream &operator<<(ostream &lhs, const pg_notify_t &notify)
notify.to != shard_id_t::NO_SHARD)
lhs << " " << (unsigned)notify.from
<< "->" << (unsigned)notify.to;
lhs << " " << notify.past_intervals;
return lhs << ")";
}

View File

@ -2993,35 +2993,6 @@ struct pg_fast_info_t {
WRITE_CLASS_ENCODER(pg_fast_info_t)
struct pg_notify_t {
epoch_t query_epoch;
epoch_t epoch_sent;
pg_info_t info;
shard_id_t to;
shard_id_t from;
pg_notify_t() :
query_epoch(0), epoch_sent(0), to(shard_id_t::NO_SHARD),
from(shard_id_t::NO_SHARD) {}
pg_notify_t(
shard_id_t to,
shard_id_t from,
epoch_t query_epoch,
epoch_t epoch_sent,
const pg_info_t &info)
: query_epoch(query_epoch),
epoch_sent(epoch_sent),
info(info), to(to), from(from) {
ceph_assert(from == info.pgid.shard);
}
void encode(ceph::buffer::list &bl) const;
void decode(ceph::buffer::list::const_iterator &p);
void dump(ceph::Formatter *f) const;
static void generate_test_instances(std::list<pg_notify_t*> &o);
};
WRITE_CLASS_ENCODER(pg_notify_t)
std::ostream &operator<<(std::ostream &lhs, const pg_notify_t &notify);
class OSDMap;
/**
* PastIntervals -- information needed to determine the PriorSet and
@ -3523,6 +3494,38 @@ PastIntervals::PriorSet::PriorSet(
<< dendl;
}
struct pg_notify_t {
epoch_t query_epoch;
epoch_t epoch_sent;
pg_info_t info;
shard_id_t to;
shard_id_t from;
PastIntervals past_intervals;
pg_notify_t() :
query_epoch(0), epoch_sent(0), to(shard_id_t::NO_SHARD),
from(shard_id_t::NO_SHARD) {}
pg_notify_t(
shard_id_t to,
shard_id_t from,
epoch_t query_epoch,
epoch_t epoch_sent,
const pg_info_t &info,
const PastIntervals& pi)
: query_epoch(query_epoch),
epoch_sent(epoch_sent),
info(info), to(to), from(from),
past_intervals(pi) {
ceph_assert(from == info.pgid.shard);
}
void encode(ceph::buffer::list &bl) const;
void decode(ceph::buffer::list::const_iterator &p);
void dump(ceph::Formatter *f) const;
static void generate_test_instances(std::list<pg_notify_t*> &o);
};
WRITE_CLASS_ENCODER(pg_notify_t)
std::ostream &operator<<(std::ostream &lhs, const pg_notify_t &notify);
/**
* pg_query_t - used to ask a peer for information about a pg.
*