mirror of
https://github.com/ceph/ceph
synced 2025-02-22 18:47:18 +00:00
osd: track clock delta between peer OSDs
We need to keep track of the monotonic clock deltas between peer OSDs in order to be able to exchange timestamps across messages. We need an upper and lower bound on this delta, depending on the context where it is used. We can use the existing ping messages to get this by assuming that a ping message is sent instantly to get a bound, and to share our delta in a follow-up reply to share the other bound. The ping sender will get both bounds with a single ping + ping_reply exchange. The ping receiver will get the delta value from the next round's ping. Include up_from in the ping messages to ensure we don't mix up different instances of the same OSD. Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
parent
c5a7cb0a7d
commit
541e208cdf
@ -34,7 +34,7 @@
|
||||
|
||||
class MOSDPing : public Message {
|
||||
private:
|
||||
static constexpr int HEAD_VERSION = 4;
|
||||
static constexpr int HEAD_VERSION = 5;
|
||||
static constexpr int COMPAT_VERSION = 4;
|
||||
|
||||
public:
|
||||
@ -61,12 +61,29 @@ private:
|
||||
uuid_d fsid;
|
||||
epoch_t map_epoch = 0;
|
||||
__u8 op = 0;
|
||||
utime_t stamp;
|
||||
utime_t ping_stamp; ///< when the PING was sent
|
||||
ceph::signedspan mono_ping_stamp; ///< relative to sender's clock
|
||||
ceph::signedspan mono_send_stamp; ///< replier's send stamp
|
||||
std::optional<ceph::time_detail::signedspan> delta_ub; ///< ping sender
|
||||
epoch_t up_from = 0;
|
||||
|
||||
uint32_t min_message_size = 0;
|
||||
|
||||
MOSDPing(const uuid_d& f, epoch_t e, __u8 o, utime_t s, uint32_t min_message)
|
||||
MOSDPing(const uuid_d& f, epoch_t e, __u8 o,
|
||||
utime_t s,
|
||||
ceph::signedspan ms,
|
||||
ceph::signedspan mss,
|
||||
epoch_t upf,
|
||||
uint32_t min_message,
|
||||
std::optional<ceph::time_detail::signedspan> delta_ub = {})
|
||||
: Message{MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION},
|
||||
fsid(f), map_epoch(e), op(o), stamp(s), min_message_size(min_message)
|
||||
fsid(f), map_epoch(e), op(o),
|
||||
ping_stamp(s),
|
||||
mono_ping_stamp(ms),
|
||||
mono_send_stamp(mss),
|
||||
delta_ub(delta_ub),
|
||||
up_from(upf),
|
||||
min_message_size(min_message)
|
||||
{ }
|
||||
MOSDPing()
|
||||
: Message{MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION}
|
||||
@ -80,11 +97,19 @@ public:
|
||||
decode(fsid, p);
|
||||
decode(map_epoch, p);
|
||||
decode(op, p);
|
||||
decode(stamp, p);
|
||||
decode(ping_stamp, p);
|
||||
|
||||
int payload_mid_length = p.get_off();
|
||||
uint32_t size;
|
||||
decode(size, p);
|
||||
|
||||
if (header.version >= 5) {
|
||||
decode(up_from, p);
|
||||
decode(mono_ping_stamp, p);
|
||||
decode(mono_send_stamp, p);
|
||||
decode(delta_ub, p);
|
||||
}
|
||||
|
||||
p.advance(size);
|
||||
min_message_size = size + payload_mid_length;
|
||||
}
|
||||
@ -93,13 +118,19 @@ public:
|
||||
encode(fsid, payload);
|
||||
encode(map_epoch, payload);
|
||||
encode(op, payload);
|
||||
encode(stamp, payload);
|
||||
encode(ping_stamp, payload);
|
||||
|
||||
size_t s = 0;
|
||||
if (min_message_size > payload.length()) {
|
||||
s = min_message_size - payload.length();
|
||||
}
|
||||
encode((uint32_t)s, payload);
|
||||
|
||||
encode(up_from, payload);
|
||||
encode(mono_ping_stamp, payload);
|
||||
encode(mono_send_stamp, payload);
|
||||
encode(delta_ub, payload);
|
||||
|
||||
if (s) {
|
||||
// this should be big enough for normal min_message padding sizes. since
|
||||
// we are targeting jumbo ethernet frames around 9000 bytes, 16k should
|
||||
@ -120,8 +151,13 @@ public:
|
||||
void print(ostream& out) const override {
|
||||
out << "osd_ping(" << get_op_name(op)
|
||||
<< " e" << map_epoch
|
||||
<< " stamp " << stamp
|
||||
<< ")";
|
||||
<< " up_from " << up_from
|
||||
<< " ping_stamp " << ping_stamp << "/" << mono_ping_stamp
|
||||
<< " send_stamp " << mono_send_stamp;
|
||||
if (delta_ub) {
|
||||
out << " delta_ub " << *delta_ub;
|
||||
}
|
||||
out << ")";
|
||||
}
|
||||
private:
|
||||
template<class T, typename... Args>
|
||||
|
141
src/osd/OSD.cc
141
src/osd/OSD.cc
@ -426,6 +426,18 @@ void OSDService::need_heartbeat_peer_update()
|
||||
osd->need_heartbeat_peer_update();
|
||||
}
|
||||
|
||||
HeartbeatStampsRef OSDService::get_hb_stamps(unsigned peer)
|
||||
{
|
||||
std::lock_guard l(hb_stamp_lock);
|
||||
if (peer >= hb_stamps.size()) {
|
||||
hb_stamps.resize(peer + 1);
|
||||
}
|
||||
if (!hb_stamps[peer]) {
|
||||
hb_stamps[peer].reset(new HeartbeatStamps(peer));
|
||||
}
|
||||
return hb_stamps[peer];
|
||||
}
|
||||
|
||||
void OSDService::start_shutdown()
|
||||
{
|
||||
{
|
||||
@ -4337,24 +4349,31 @@ void OSD::_add_heartbeat_peer(int p)
|
||||
pair<ConnectionRef,ConnectionRef> cons = service.get_con_osd_hb(p, osdmap->get_epoch());
|
||||
if (!cons.first)
|
||||
return;
|
||||
assert(cons.second);
|
||||
|
||||
hi = &heartbeat_peers[p];
|
||||
hi->peer = p;
|
||||
RefCountedPtr s{new HeartbeatSession{p}, false};
|
||||
|
||||
auto stamps = service.get_hb_stamps(p);
|
||||
|
||||
Session *sb = new Session(cct, cons.first.get());
|
||||
sb->peer = p;
|
||||
sb->stamps = stamps;
|
||||
RefCountedPtr sbref{sb, false};
|
||||
hi->con_back = cons.first.get();
|
||||
hi->con_back->set_priv(s);
|
||||
if (cons.second) {
|
||||
hi->con_front = cons.second.get();
|
||||
hi->con_front->set_priv(s);
|
||||
dout(10) << "_add_heartbeat_peer: new peer osd." << p
|
||||
<< " " << hi->con_back->get_peer_addr()
|
||||
<< " " << hi->con_front->get_peer_addr()
|
||||
<< dendl;
|
||||
} else {
|
||||
hi->con_front.reset(NULL);
|
||||
dout(10) << "_add_heartbeat_peer: new peer osd." << p
|
||||
<< " " << hi->con_back->get_peer_addr()
|
||||
<< dendl;
|
||||
}
|
||||
hi->con_back->set_priv(sbref);
|
||||
|
||||
Session *sf = new Session(cct, cons.second.get());
|
||||
sf->peer = p;
|
||||
sf->stamps = stamps;
|
||||
RefCountedPtr sfref{sf, false};
|
||||
hi->con_front = cons.second.get();
|
||||
hi->con_front->set_priv(sfref);
|
||||
|
||||
dout(10) << "_add_heartbeat_peer: new peer osd." << p
|
||||
<< " " << hi->con_back->get_peer_addr()
|
||||
<< " " << hi->con_front->get_peer_addr()
|
||||
<< dendl;
|
||||
} else {
|
||||
hi = &i->second;
|
||||
}
|
||||
@ -4518,7 +4537,8 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
{
|
||||
if (superblock.cluster_fsid != m->fsid) {
|
||||
dout(20) << "handle_osd_ping from " << m->get_source_inst()
|
||||
<< " bad fsid " << m->fsid << " != " << superblock.cluster_fsid << dendl;
|
||||
<< " bad fsid " << m->fsid << " != " << superblock.cluster_fsid
|
||||
<< dendl;
|
||||
m->put();
|
||||
return;
|
||||
}
|
||||
@ -4533,6 +4553,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
}
|
||||
|
||||
utime_t now = ceph_clock_now();
|
||||
auto mnow = service.get_mnow();
|
||||
ConnectionRef con(m->get_connection());
|
||||
OSDMapRef curmap = service.get_osdmap();
|
||||
if (!curmap) {
|
||||
@ -4541,6 +4562,18 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
return;
|
||||
}
|
||||
|
||||
auto sref = con->get_priv();
|
||||
Session *s = static_cast<Session*>(sref.get());
|
||||
if (!s) {
|
||||
heartbeat_lock.unlock();
|
||||
m->put();
|
||||
return;
|
||||
}
|
||||
if (!s->stamps) {
|
||||
s->peer = from;
|
||||
s->stamps = service.get_hb_stamps(from);
|
||||
}
|
||||
|
||||
switch (m->op) {
|
||||
|
||||
case MOSDPing::PING:
|
||||
@ -4569,6 +4602,15 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
}
|
||||
}
|
||||
|
||||
ceph::signedspan sender_delta_ub;
|
||||
s->stamps->got_ping(
|
||||
m->up_from,
|
||||
mnow,
|
||||
m->mono_send_stamp,
|
||||
m->delta_ub,
|
||||
&sender_delta_ub);
|
||||
dout(20) << __func__ << " new stamps " << *s->stamps << dendl;
|
||||
|
||||
if (!cct->get_heartbeat_map()->is_healthy()) {
|
||||
dout(10) << "internal heartbeat not healthy, dropping ping request"
|
||||
<< dendl;
|
||||
@ -4577,8 +4619,13 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
|
||||
Message *r = new MOSDPing(monc->get_fsid(),
|
||||
curmap->get_epoch(),
|
||||
MOSDPing::PING_REPLY, m->stamp,
|
||||
cct->_conf->osd_heartbeat_min_size);
|
||||
MOSDPing::PING_REPLY,
|
||||
m->ping_stamp,
|
||||
m->mono_ping_stamp,
|
||||
mnow,
|
||||
service.get_up_epoch(),
|
||||
cct->_conf->osd_heartbeat_min_size,
|
||||
sender_delta_ub);
|
||||
con->send_message(r);
|
||||
|
||||
if (curmap->is_up(from)) {
|
||||
@ -4595,7 +4642,10 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
Message *r = new MOSDPing(monc->get_fsid(),
|
||||
curmap->get_epoch(),
|
||||
MOSDPing::YOU_DIED,
|
||||
m->stamp,
|
||||
m->ping_stamp,
|
||||
m->mono_ping_stamp,
|
||||
mnow,
|
||||
service.get_up_epoch(),
|
||||
cct->_conf->osd_heartbeat_min_size);
|
||||
con->send_message(r);
|
||||
}
|
||||
@ -4606,7 +4656,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
{
|
||||
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
|
||||
if (i != heartbeat_peers.end()) {
|
||||
auto acked = i->second.ping_history.find(m->stamp);
|
||||
auto acked = i->second.ping_history.find(m->ping_stamp);
|
||||
if (acked != i->second.ping_history.end()) {
|
||||
int &unacknowledged = acked->second.second;
|
||||
if (con == i->second.con_back) {
|
||||
@ -4642,7 +4692,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
if (unacknowledged == 0) {
|
||||
// succeeded in getting all replies
|
||||
dout(25) << "handle_osd_ping got all replies from osd." << from
|
||||
<< " , erase pending ping(sent at " << m->stamp << ")"
|
||||
<< " , erase pending ping(sent at " << m->ping_stamp << ")"
|
||||
<< " and older pending ping(s)"
|
||||
<< dendl;
|
||||
i->second.ping_history.erase(i->second.ping_history.begin(), ++acked);
|
||||
@ -4669,7 +4719,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
}
|
||||
} else {
|
||||
// old replies, deprecated by newly sent pings.
|
||||
dout(10) << "handle_osd_ping no pending ping(sent at " << m->stamp
|
||||
dout(10) << "handle_osd_ping no pending ping(sent at " << m->ping_stamp
|
||||
<< ") is found, treat as covered by newly sent pings "
|
||||
<< "and ignore"
|
||||
<< dendl;
|
||||
@ -4686,6 +4736,12 @@ void OSD::handle_osd_ping(MOSDPing *m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s->stamps->got_ping_reply(
|
||||
mnow,
|
||||
m->mono_send_stamp,
|
||||
m->delta_ub);
|
||||
dout(20) << __func__ << " new stamps " << *s->stamps << dendl;
|
||||
}
|
||||
break;
|
||||
|
||||
@ -4807,6 +4863,7 @@ void OSD::heartbeat()
|
||||
service.check_full_status(ratio, pratio);
|
||||
|
||||
utime_t now = ceph_clock_now();
|
||||
auto mnow = service.get_mnow();
|
||||
utime_t deadline = now;
|
||||
deadline += cct->_conf->osd_heartbeat_grace;
|
||||
|
||||
@ -4815,22 +4872,40 @@ void OSD::heartbeat()
|
||||
i != heartbeat_peers.end();
|
||||
++i) {
|
||||
int peer = i->first;
|
||||
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
|
||||
|
||||
i->second.last_tx = now;
|
||||
if (i->second.first_tx == utime_t())
|
||||
i->second.first_tx = now;
|
||||
i->second.ping_history[now] = make_pair(deadline,
|
||||
HeartbeatInfo::HEARTBEAT_MAX_CONN);
|
||||
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
|
||||
i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
|
||||
service.get_osdmap_epoch(),
|
||||
MOSDPing::PING, now,
|
||||
cct->_conf->osd_heartbeat_min_size));
|
||||
|
||||
Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
|
||||
std::optional<ceph::signedspan> delta_ub;
|
||||
s->stamps->sent_ping(&delta_ub);
|
||||
|
||||
i->second.con_back->send_message(
|
||||
new MOSDPing(monc->get_fsid(),
|
||||
service.get_osdmap_epoch(),
|
||||
MOSDPing::PING,
|
||||
now,
|
||||
mnow,
|
||||
mnow,
|
||||
service.get_up_epoch(),
|
||||
cct->_conf->osd_heartbeat_min_size,
|
||||
delta_ub));
|
||||
|
||||
if (i->second.con_front)
|
||||
i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
|
||||
service.get_osdmap_epoch(),
|
||||
MOSDPing::PING, now,
|
||||
cct->_conf->osd_heartbeat_min_size));
|
||||
i->second.con_front->send_message(
|
||||
new MOSDPing(monc->get_fsid(),
|
||||
service.get_osdmap_epoch(),
|
||||
MOSDPing::PING,
|
||||
now,
|
||||
mnow,
|
||||
mnow,
|
||||
service.get_up_epoch(),
|
||||
cct->_conf->osd_heartbeat_min_size,
|
||||
delta_ub));
|
||||
}
|
||||
|
||||
logger->set(l_osd_hb_to, heartbeat_peers.size());
|
||||
@ -4857,8 +4932,8 @@ bool OSD::heartbeat_reset(Connection *con)
|
||||
if (is_stopping()) {
|
||||
return true;
|
||||
}
|
||||
auto heartbeat_session = static_cast<HeartbeatSession*>(s.get());
|
||||
auto p = heartbeat_peers.find(heartbeat_session->peer);
|
||||
auto session = static_cast<Session*>(s.get());
|
||||
auto p = heartbeat_peers.find(session->peer);
|
||||
if (p != heartbeat_peers.end() &&
|
||||
(p->second.con_back == con ||
|
||||
p->second.con_front == con)) {
|
||||
|
@ -850,6 +850,15 @@ public:
|
||||
|
||||
void request_osdmap_update(epoch_t e);
|
||||
|
||||
// -- heartbeats --
|
||||
ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
|
||||
|
||||
/// osd -> heartbeat stamps
|
||||
vector<HeartbeatStampsRef> hb_stamps;
|
||||
|
||||
/// get or create a ref for a peer's HeartbeatStamps
|
||||
HeartbeatStampsRef get_hb_stamps(unsigned osd);
|
||||
|
||||
// -- stopping --
|
||||
ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
|
||||
ceph::condition_variable is_stopping_cond;
|
||||
@ -1436,11 +1445,7 @@ private:
|
||||
return !is_unhealthy(now);
|
||||
}
|
||||
};
|
||||
/// state attached to outgoing heartbeat connections
|
||||
struct HeartbeatSession : public RefCountedObject {
|
||||
int peer;
|
||||
explicit HeartbeatSession(int p) : peer(p) {}
|
||||
};
|
||||
|
||||
ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
|
||||
map<int, int> debug_heartbeat_drops_remaining;
|
||||
ceph::condition_variable heartbeat_cond;
|
||||
|
@ -78,6 +78,99 @@ struct BufferedRecoveryMessages {
|
||||
}
|
||||
};
|
||||
|
||||
struct HeartbeatStamps : public RefCountedObject {
|
||||
mutable ceph::mutex lock = ceph::make_mutex("HeartbeatStamps::lock");
|
||||
|
||||
const int osd;
|
||||
|
||||
// we maintain an upper and lower bound on the delta between our local
|
||||
// mono_clock time (minus the startup_time) to the peer OSD's mono_clock
|
||||
// time (minus its startup_time).
|
||||
//
|
||||
// delta is (remote_clock_time - local_clock_time), so that
|
||||
// local_time + delta -> peer_time, and peer_time - delta -> local_time.
|
||||
//
|
||||
// we have an upper and lower bound value on this delta, meaning the
|
||||
// value of the remote clock is somewhere between [my_time + lb, my_time + ub]
|
||||
//
|
||||
// conversely, if we have a remote timestamp T, then that is
|
||||
// [T - ub, T - lb] in terms of the local clock. i.e., if you are
|
||||
// substracting the delta, then take care that you swap the role of the
|
||||
// lb and ub values.
|
||||
|
||||
/// lower bound on peer clock - local clock
|
||||
std::optional<ceph::signedspan> peer_clock_delta_lb;
|
||||
|
||||
/// upper bound on peer clock - local clock
|
||||
std::optional<ceph::signedspan> peer_clock_delta_ub;
|
||||
|
||||
/// highest up_from we've seen from this rank
|
||||
epoch_t up_from = 0;
|
||||
|
||||
HeartbeatStamps(int o)
|
||||
: RefCountedObject(NULL, 0),
|
||||
osd(o) {}
|
||||
|
||||
void print(ostream& out) const {
|
||||
std::lock_guard l(lock);
|
||||
out << "hbstamp(osd." << osd << " up_from " << up_from
|
||||
<< " peer_clock_delta [";
|
||||
if (peer_clock_delta_lb) {
|
||||
out << *peer_clock_delta_lb;
|
||||
}
|
||||
out << ",";
|
||||
if (peer_clock_delta_ub) {
|
||||
out << *peer_clock_delta_ub;
|
||||
}
|
||||
out << "])";
|
||||
}
|
||||
|
||||
void sent_ping(std::optional<ceph::signedspan> *delta_ub) {
|
||||
std::lock_guard l(lock);
|
||||
// the non-primaries need a lower bound on remote clock - local clock. if
|
||||
// we assume the transit for the last ping_reply was
|
||||
// instantaneous, that would be (the negative of) our last
|
||||
// peer_clock_delta_lb value.
|
||||
if (peer_clock_delta_lb) {
|
||||
*delta_ub = - *peer_clock_delta_lb;
|
||||
}
|
||||
}
|
||||
|
||||
void got_ping(epoch_t this_up_from,
|
||||
ceph::signedspan now,
|
||||
ceph::signedspan peer_send_stamp,
|
||||
std::optional<ceph::signedspan> delta_ub,
|
||||
ceph::signedspan *out_delta_ub) {
|
||||
std::lock_guard l(lock);
|
||||
if (this_up_from < up_from) {
|
||||
return;
|
||||
}
|
||||
if (this_up_from > up_from) {
|
||||
up_from = this_up_from;
|
||||
}
|
||||
peer_clock_delta_lb = peer_send_stamp - now;
|
||||
peer_clock_delta_ub = delta_ub;
|
||||
*out_delta_ub = - *peer_clock_delta_lb;
|
||||
}
|
||||
|
||||
void got_ping_reply(ceph::signedspan now,
|
||||
ceph::signedspan peer_send_stamp,
|
||||
std::optional<ceph::signedspan> delta_ub) {
|
||||
std::lock_guard l(lock);
|
||||
peer_clock_delta_lb = peer_send_stamp - now;
|
||||
peer_clock_delta_ub = delta_ub;
|
||||
}
|
||||
|
||||
};
|
||||
typedef boost::intrusive_ptr<HeartbeatStamps> HeartbeatStampsRef;
|
||||
|
||||
inline ostream& operator<<(ostream& out, const HeartbeatStamps& hb)
|
||||
{
|
||||
hb.print(out);
|
||||
return out;
|
||||
}
|
||||
|
||||
|
||||
struct PeeringCtx : BufferedRecoveryMessages {
|
||||
ObjectStore::Transaction transaction;
|
||||
HBHandle* handle = nullptr;
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "OSDCap.h"
|
||||
#include "Watch.h"
|
||||
#include "OSDMap.h"
|
||||
#include "PeeringState.h"
|
||||
|
||||
//#define PG_DEBUG_REFS
|
||||
|
||||
@ -148,6 +149,10 @@ struct Session : public RefCountedObject {
|
||||
|
||||
std::atomic<uint64_t> backoff_seq = {0};
|
||||
|
||||
// for heartbeat connections only
|
||||
int peer = -1;
|
||||
HeartbeatStampsRef stamps;
|
||||
|
||||
explicit Session(CephContext *cct, Connection *con_) :
|
||||
RefCountedObject(cct),
|
||||
con(con_),
|
||||
|
Loading…
Reference in New Issue
Block a user