crimson/net: send AckFrame for lossless policy

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
This commit is contained in:
Yingxin Cheng 2019-08-08 16:10:54 +08:00
parent 6cacf1f7b2
commit 49a08e8bc3
6 changed files with 45 additions and 13 deletions

View File

@ -96,6 +96,14 @@ void Protocol::notify_keepalive_ack(utime_t _keepalive_ack)
write_event();
}
void Protocol::notify_ack()
{
if (!conn.policy.lossy) {
++ack_left;
write_event();
}
}
void Protocol::requeue_sent()
{
assert(write_state != write_state_t::open);
@ -146,6 +154,7 @@ void Protocol::reset_write()
conn.sent.clear();
need_keepalive = false;
keepalive_ack = std::nullopt;
ack_left = 0;
}
void Protocol::ack_writes(seq_num_t seq)
@ -177,14 +186,18 @@ seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
conn.pending_q.begin(),
conn.pending_q.end());
}
auto acked = ack_left;
assert(acked == 0 || conn.in_seq > 0);
// sweep all pending writes with the concrete Protocol
return socket->write(do_sweep_messages(
conn.pending_q, num_msgs, need_keepalive, keepalive_ack))
.then([this, prv_keepalive_ack=keepalive_ack] {
conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
).then([this, prv_keepalive_ack=keepalive_ack, acked] {
need_keepalive = false;
if (keepalive_ack == prv_keepalive_ack) {
keepalive_ack = std::nullopt;
}
assert(ack_left >= acked);
ack_left -= acked;
if (!is_queued()) {
// good, we have nothing pending to send now.
return socket->flush().then([this] {

View File

@ -44,7 +44,8 @@ class Protocol {
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> keepalive_ack) = 0;
std::optional<utime_t> keepalive_ack,
bool require_ack) = 0;
public:
const proto_t proto_type;
@ -102,6 +103,8 @@ class Protocol {
void notify_keepalive_ack(utime_t keepalive_ack);
void notify_ack();
void requeue_up_to(seq_num_t seq);
void requeue_sent();
@ -110,6 +113,7 @@ class Protocol {
bool is_queued() const {
return (!conn.out_q.empty() ||
ack_left > 0 ||
need_keepalive ||
keepalive_ack.has_value());
}
@ -123,6 +127,7 @@ class Protocol {
bool need_keepalive = false;
std::optional<utime_t> keepalive_ack = std::nullopt;
uint64_t ack_left = 0;
bool write_dispatching = false;
// Indicate if we are in the middle of writing.
bool open_write = false;

View File

@ -680,7 +680,8 @@ ceph::bufferlist ProtocolV1::do_sweep_messages(
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> _keepalive_ack)
std::optional<utime_t> _keepalive_ack,
bool require_ack)
{
static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
sizeof(ceph_msg_header) +
@ -711,6 +712,15 @@ ceph::bufferlist ProtocolV1::do_sweep_messages(
bl.append(create_static(k.ack));
}
if (require_ack) {
// XXX: we decided not to support lossless connection in v1. as the
// client's default policy is
// Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is
// lossy. And by the time of crimson-osd's GA, the in-cluster communication
// will all be performed using v2 protocol.
ceph_abort("lossless policy not supported for v1");
}
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
ceph_assert(!msg->get_seq() && "message already has seq");
msg->set_seq(++conn.out_seq);

View File

@ -30,7 +30,8 @@ class ProtocolV1 final : public Protocol {
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> keepalive_ack) override;
std::optional<utime_t> keepalive_ack,
bool require_ack) override;
private:
SocketMessenger &messenger;

View File

@ -1540,7 +1540,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> _keepalive_ack)
std::optional<utime_t> _keepalive_ack,
bool require_ack)
{
ceph::bufferlist bl;
@ -1554,6 +1555,11 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
}
if (require_ack && !num_msgs) {
auto ack_frame = AckFrame::Encode(conn.in_seq);
bl.append(ack_frame.get_buffer(session_stream_handlers));
}
std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
// TODO: move to common code
// set priority
@ -1563,8 +1569,6 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
ceph_assert(!msg->get_seq() && "message already has seq");
msg->set_seq(++conn.out_seq);
uint64_t ack_seq = conn.in_seq;
// ack_left = 0;
ceph_msg_header &header = msg->get_header();
ceph_msg_footer &footer = msg->get_footer();
@ -1573,7 +1577,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
header.type, header.priority,
header.version,
0, header.data_off,
ack_seq,
conn.in_seq,
footer.flags, header.compat_version,
header.reserved};
@ -1661,9 +1665,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
conn.in_seq = message->get_seq();
logger().debug("{} <== #{} === {} ({})",
conn, message->get_seq(), *message, message->get_type());
if (!conn.policy.lossy) {
// ++ack_left;
}
notify_ack();
ack_writes(current_header.ack_seq);
// TODO: change MessageRef with seastar::shared_ptr

View File

@ -29,7 +29,8 @@ class ProtocolV2 final : public Protocol {
const std::deque<MessageRef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> keepalive_ack) override;
std::optional<utime_t> keepalive_ack,
bool require_ack) override;
private:
SocketMessenger &messenger;