mirror of
https://github.com/ceph/ceph
synced 2025-04-01 00:26:47 +00:00
msg/async, v2: drop magic numbers for segments.
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
parent
50b5174637
commit
8d49bc3cce
@ -707,9 +707,10 @@ void ProtocolV2::reset_recv_state() {
|
||||
connection->pendingReadLen.reset();
|
||||
connection->writeCallback.reset();
|
||||
|
||||
uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
|
||||
rx_segments_desc[2].logical.length + \
|
||||
rx_segments_desc[3].logical.length;
|
||||
uint32_t cur_msg_size = \
|
||||
rx_segments_desc[SegmentIndex::Msg::FRONT].logical.length + \
|
||||
rx_segments_desc[SegmentIndex::Msg::MIDDLE].logical.length + \
|
||||
rx_segments_desc[SegmentIndex::Msg::DATA].logical.length;
|
||||
|
||||
if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
|
||||
connection->policy.throttler_messages) {
|
||||
@ -1699,12 +1700,13 @@ CtPtr ProtocolV2::handle_message() {
|
||||
if (auth_meta->is_mode_secure()) {
|
||||
ceph_assert(session_stream_handlers.rx);
|
||||
|
||||
rx_segments_data[0] = \
|
||||
rx_segments_data[SegmentIndex::Msg::HEADER] = \
|
||||
session_stream_handlers.rx->authenticated_decrypt_update(
|
||||
std::move(rx_segments_data[0]),
|
||||
std::move(rx_segments_data[SegmentIndex::Msg::HEADER]),
|
||||
segment_t::DEFAULT_ALIGNMENT);
|
||||
}
|
||||
MessageHeaderFrame header_frame(std::move(rx_segments_data[0]));
|
||||
MessageHeaderFrame header_frame(
|
||||
std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
|
||||
ceph_msg_header2 &header = header_frame.header();
|
||||
|
||||
ldout(cct, 20) << __func__
|
||||
@ -1738,14 +1740,16 @@ CtPtr ProtocolV2::handle_message() {
|
||||
current_header = header;
|
||||
|
||||
// front
|
||||
ceph_assert(current_header.front_len == rx_segments_data[1].length());
|
||||
ceph_assert(current_header.front_len == \
|
||||
rx_segments_data[SegmentIndex::Msg::FRONT].length());
|
||||
ceph_assert(!front.length());
|
||||
front = std::move(rx_segments_data[1]);
|
||||
front = std::move(rx_segments_data[SegmentIndex::Msg::FRONT]);
|
||||
|
||||
// middle
|
||||
ceph_assert(current_header.middle_len == rx_segments_data[2].length());
|
||||
ceph_assert(current_header.middle_len == \
|
||||
rx_segments_data[SegmentIndex::Msg::MIDDLE].length());
|
||||
ceph_assert(!middle.length());
|
||||
middle = std::move(rx_segments_data[2]);
|
||||
middle = std::move(rx_segments_data[SegmentIndex::Msg::MIDDLE]);
|
||||
|
||||
next_payload_len -= sizeof(ceph_msg_header2);
|
||||
next_payload_len -= front.length();
|
||||
@ -2053,9 +2057,10 @@ CtPtr ProtocolV2::throttle_bytes() {
|
||||
ldout(cct, 20) << __func__ << dendl;
|
||||
|
||||
ceph_assert(rx_segments_desc.size() == 4);
|
||||
uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
|
||||
rx_segments_desc[2].logical.length + \
|
||||
rx_segments_desc[3].logical.length;
|
||||
uint32_t cur_msg_size = \
|
||||
rx_segments_desc[SegmentIndex::Msg::FRONT].logical.length + \
|
||||
rx_segments_desc[SegmentIndex::Msg::MIDDLE].logical.length + \
|
||||
rx_segments_desc[SegmentIndex::Msg::DATA].logical.length;
|
||||
if (cur_msg_size) {
|
||||
if (connection->policy.throttler_bytes) {
|
||||
ldout(cct, 10) << __func__ << " wants " << cur_msg_size
|
||||
@ -2088,9 +2093,10 @@ CtPtr ProtocolV2::throttle_dispatch_queue() {
|
||||
ldout(cct, 20) << __func__ << dendl;
|
||||
|
||||
ceph_assert(rx_segments_desc.size() == 4);
|
||||
uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
|
||||
rx_segments_desc[2].logical.length + \
|
||||
rx_segments_desc[3].logical.length;
|
||||
uint32_t cur_msg_size =
|
||||
rx_segments_desc[SegmentIndex::Msg::FRONT].logical.length + \
|
||||
rx_segments_desc[SegmentIndex::Msg::MIDDLE].logical.length + \
|
||||
rx_segments_desc[SegmentIndex::Msg::DATA].logical.length;
|
||||
|
||||
if (cur_msg_size) {
|
||||
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
|
||||
@ -2450,7 +2456,9 @@ CtPtr ProtocolV2::handle_wait() {
|
||||
ldout(cct, 1) << __func__ << " received WAIT (connection race)" << dendl;
|
||||
state = WAIT;
|
||||
ceph_assert(rx_segments_data.size() == 1);
|
||||
WaitFrame(*this, rx_segments_data[0].c_str(), rx_segments_data[0].length());
|
||||
ceph_assert(rx_segments_desc.size() == 1);
|
||||
WaitFrame(*this, rx_segments_data[SegmentIndex::Frame::PAYLOAD].c_str(),
|
||||
rx_segments_data[SegmentIndex::Frame::PAYLOAD].length());
|
||||
return _fault();
|
||||
}
|
||||
|
||||
|
@ -127,14 +127,27 @@ public:
|
||||
struct segment_t logical;
|
||||
} __attribute__((packed));
|
||||
|
||||
private:
|
||||
static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
|
||||
|
||||
struct SegmentIndex {
|
||||
struct Msg {
|
||||
static constexpr std::size_t HEADER = 0;
|
||||
static constexpr std::size_t FRONT = 1;
|
||||
static constexpr std::size_t MIDDLE = 2;
|
||||
static constexpr std::size_t DATA = 3;
|
||||
};
|
||||
|
||||
struct Frame {
|
||||
static constexpr std::size_t PAYLOAD = 0;
|
||||
};
|
||||
};
|
||||
|
||||
boost::container::static_vector<onwire_segment_t,
|
||||
MAX_NUM_SEGMENTS> rx_segments_desc;
|
||||
boost::container::static_vector<ceph::bufferlist,
|
||||
MAX_NUM_SEGMENTS> rx_segments_data;
|
||||
|
||||
private:
|
||||
|
||||
Tag next_tag;
|
||||
ceph_msg_header2 current_header;
|
||||
|
Loading…
Reference in New Issue
Block a user