mirror of
https://github.com/ceph/ceph
synced 2025-01-04 02:02:36 +00:00
msg/async: rename outcoming_bl -> outgoing_bl in AsyncConnection.
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
This commit is contained in:
parent
a9c2370ba8
commit
7997a3ea19
@ -304,7 +304,7 @@ ssize_t AsyncConnection::write(bufferlist &bl,
|
||||
bool more) {
|
||||
|
||||
std::unique_lock<std::mutex> l(write_lock);
|
||||
outcoming_bl.claim_append(bl);
|
||||
outgoing_bl.claim_append(bl);
|
||||
ssize_t r = _try_send(more);
|
||||
if (r > 0) {
|
||||
writeCallback = callback;
|
||||
@ -324,16 +324,16 @@ ssize_t AsyncConnection::_try_send(bool more)
|
||||
}
|
||||
|
||||
ceph_assert(center->in_thread());
|
||||
ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length()
|
||||
ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
|
||||
<< " bytes" << dendl;
|
||||
ssize_t r = cs.send(outcoming_bl, more);
|
||||
ssize_t r = cs.send(outgoing_bl, more);
|
||||
if (r < 0) {
|
||||
ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
|
||||
return r;
|
||||
}
|
||||
|
||||
ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
|
||||
<< " remaining bytes " << outcoming_bl.length() << dendl;
|
||||
<< " remaining bytes " << outgoing_bl.length() << dendl;
|
||||
|
||||
if (!open_write && is_queued()) {
|
||||
center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
|
||||
@ -348,7 +348,7 @@ ssize_t AsyncConnection::_try_send(bool more)
|
||||
}
|
||||
}
|
||||
|
||||
return outcoming_bl.length();
|
||||
return outgoing_bl.length();
|
||||
}
|
||||
|
||||
void AsyncConnection::inject_delay() {
|
||||
@ -600,7 +600,7 @@ void AsyncConnection::fault()
|
||||
|
||||
recv_start = recv_end = 0;
|
||||
state_offset = 0;
|
||||
outcoming_bl.clear();
|
||||
outgoing_bl.clear();
|
||||
}
|
||||
|
||||
void AsyncConnection::_stop() {
|
||||
@ -618,7 +618,7 @@ void AsyncConnection::_stop() {
|
||||
}
|
||||
|
||||
bool AsyncConnection::is_queued() const {
|
||||
return outcoming_bl.length();
|
||||
return outgoing_bl.length();
|
||||
}
|
||||
|
||||
void AsyncConnection::shutdown_socket() {
|
||||
|
@ -170,7 +170,7 @@ class AsyncConnection : public Connection {
|
||||
DispatchQueue *dispatch_queue;
|
||||
|
||||
// lockfree, only used in own thread
|
||||
bufferlist outcoming_bl;
|
||||
bufferlist outgoing_bl;
|
||||
bool open_write = false;
|
||||
|
||||
std::mutex write_lock;
|
||||
|
@ -361,8 +361,8 @@ void ProtocolV1::write_event() {
|
||||
if (left) {
|
||||
ceph_le64 s;
|
||||
s = in_seq;
|
||||
connection->outcoming_bl.append(CEPH_MSGR_TAG_ACK);
|
||||
connection->outcoming_bl.append((char *)&s, sizeof(s));
|
||||
connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK);
|
||||
connection->outgoing_bl.append((char *)&s, sizeof(s));
|
||||
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
|
||||
<< " messages" << dendl;
|
||||
ack_left -= left;
|
||||
@ -550,16 +550,16 @@ void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) {
|
||||
ceph_assert(tp);
|
||||
struct ceph_timespec ts;
|
||||
tp->encode_timeval(&ts);
|
||||
connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
|
||||
connection->outcoming_bl.append((char *)&ts, sizeof(ts));
|
||||
connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
|
||||
connection->outgoing_bl.append((char *)&ts, sizeof(ts));
|
||||
} else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
|
||||
struct ceph_timespec ts;
|
||||
utime_t t = ceph_clock_now();
|
||||
t.encode_timeval(&ts);
|
||||
connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
|
||||
connection->outcoming_bl.append((char *)&ts, sizeof(ts));
|
||||
connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
|
||||
connection->outgoing_bl.append((char *)&ts, sizeof(ts));
|
||||
} else {
|
||||
connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
|
||||
connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1075,9 +1075,9 @@ void ProtocolV1::session_reset() {
|
||||
|
||||
connection->dispatch_queue->discard_queue(connection->conn_id);
|
||||
discard_out_queue();
|
||||
// note: we need to clear outcoming_bl here, but session_reset may be
|
||||
// note: we need to clear outgoing_bl here, but session_reset may be
|
||||
// called by other thread, so let caller clear this itself!
|
||||
// outcoming_bl.clear();
|
||||
// outgoing_bl.clear();
|
||||
|
||||
connection->dispatch_queue->queue_remote_reset(connection);
|
||||
|
||||
@ -1133,8 +1133,8 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
|
||||
}
|
||||
}
|
||||
|
||||
connection->outcoming_bl.append(CEPH_MSGR_TAG_MSG);
|
||||
connection->outcoming_bl.append((char *)&header, sizeof(header));
|
||||
connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG);
|
||||
connection->outgoing_bl.append((char *)&header, sizeof(header));
|
||||
|
||||
ldout(cct, 20) << __func__ << " sending message type=" << header.type
|
||||
<< " src " << entity_name_t(header.src)
|
||||
@ -1143,17 +1143,17 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
|
||||
|
||||
if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
|
||||
for (const auto &pb : bl.buffers()) {
|
||||
connection->outcoming_bl.append((char *)pb.c_str(), pb.length());
|
||||
connection->outgoing_bl.append((char *)pb.c_str(), pb.length());
|
||||
}
|
||||
} else {
|
||||
connection->outcoming_bl.claim_append(bl);
|
||||
connection->outgoing_bl.claim_append(bl);
|
||||
}
|
||||
|
||||
// send footer; if receiver doesn't support signatures, use the old footer
|
||||
// format
|
||||
ceph_msg_footer_old old_footer;
|
||||
if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) {
|
||||
connection->outcoming_bl.append((char *)&footer, sizeof(footer));
|
||||
connection->outgoing_bl.append((char *)&footer, sizeof(footer));
|
||||
} else {
|
||||
if (messenger->crcflags & MSG_CRC_HEADER) {
|
||||
old_footer.front_crc = footer.front_crc;
|
||||
@ -1164,20 +1164,20 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
|
||||
old_footer.data_crc =
|
||||
messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
|
||||
old_footer.flags = footer.flags;
|
||||
connection->outcoming_bl.append((char *)&old_footer, sizeof(old_footer));
|
||||
connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer));
|
||||
}
|
||||
|
||||
m->trace.event("async writing message");
|
||||
ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
|
||||
<< dendl;
|
||||
ssize_t total_send_size = connection->outcoming_bl.length();
|
||||
ssize_t total_send_size = connection->outgoing_bl.length();
|
||||
ssize_t rc = connection->_try_send(more);
|
||||
if (rc < 0) {
|
||||
ldout(cct, 1) << __func__ << " error sending " << m << ", "
|
||||
<< cpp_strerror(rc) << dendl;
|
||||
} else {
|
||||
connection->logger->inc(
|
||||
l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
|
||||
l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
|
||||
ldout(cct, 10) << __func__ << " sending " << m
|
||||
<< (rc ? " continuely." : " done.") << dendl;
|
||||
}
|
||||
@ -1668,7 +1668,7 @@ CtPtr ProtocolV1::handle_connect_reply_2() {
|
||||
connect_seq = 0;
|
||||
|
||||
// see session_reset
|
||||
connection->outcoming_bl.clear();
|
||||
connection->outgoing_bl.clear();
|
||||
|
||||
return CONTINUE(send_connect_message);
|
||||
}
|
||||
@ -2348,7 +2348,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
|
||||
std::lock_guard<std::mutex> l(existing->lock);
|
||||
existing->write_lock.lock();
|
||||
exproto->requeue_sent();
|
||||
existing->outcoming_bl.clear();
|
||||
existing->outgoing_bl.clear();
|
||||
existing->open_write = false;
|
||||
existing->write_lock.unlock();
|
||||
if (exproto->state == NONE) {
|
||||
|
@ -145,7 +145,7 @@ void ProtocolV2::reset_session() {
|
||||
|
||||
connection->dispatch_queue->discard_queue(connection->conn_id);
|
||||
discard_out_queue();
|
||||
connection->outcoming_bl.clear();
|
||||
connection->outgoing_bl.clear();
|
||||
|
||||
connection->dispatch_queue->queue_remote_reset(connection);
|
||||
|
||||
@ -508,7 +508,7 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
|
||||
m->get_payload(),
|
||||
m->get_middle(),
|
||||
m->get_data());
|
||||
connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
|
||||
connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
|
||||
|
||||
ldout(cct, 5) << __func__ << " sending message m=" << m
|
||||
<< " seq=" << m->get_seq() << " " << *m << dendl;
|
||||
@ -518,14 +518,14 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
|
||||
<< " src=" << entity_name_t(messenger->get_myname())
|
||||
<< " off=" << header2.data_off
|
||||
<< dendl;
|
||||
ssize_t total_send_size = connection->outcoming_bl.length();
|
||||
ssize_t total_send_size = connection->outgoing_bl.length();
|
||||
ssize_t rc = connection->_try_send(more);
|
||||
if (rc < 0) {
|
||||
ldout(cct, 1) << __func__ << " error sending " << m << ", "
|
||||
<< cpp_strerror(rc) << dendl;
|
||||
} else {
|
||||
connection->logger->inc(
|
||||
l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
|
||||
l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
|
||||
ldout(cct, 10) << __func__ << " sending " << m
|
||||
<< (rc ? " continuely." : " done.") << dendl;
|
||||
}
|
||||
@ -544,12 +544,12 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
|
||||
void ProtocolV2::append_keepalive() {
|
||||
ldout(cct, 10) << __func__ << dendl;
|
||||
auto keepalive_frame = KeepAliveFrame::Encode();
|
||||
connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
|
||||
connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
|
||||
}
|
||||
|
||||
void ProtocolV2::append_keepalive_ack(utime_t ×tamp) {
|
||||
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
|
||||
connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
|
||||
connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
|
||||
}
|
||||
|
||||
void ProtocolV2::handle_message_ack(uint64_t seq) {
|
||||
@ -639,7 +639,7 @@ void ProtocolV2::write_event() {
|
||||
uint64_t left = ack_left;
|
||||
if (left) {
|
||||
auto ack = AckFrame::Encode(in_seq);
|
||||
connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
|
||||
connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
|
||||
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
|
||||
<< " messages" << dendl;
|
||||
ack_left -= left;
|
||||
@ -2715,7 +2715,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
|
||||
std::lock_guard<std::mutex> l(existing->lock);
|
||||
existing->write_lock.lock();
|
||||
exproto->requeue_sent();
|
||||
existing->outcoming_bl.clear();
|
||||
existing->outgoing_bl.clear();
|
||||
existing->open_write = false;
|
||||
existing->write_lock.unlock();
|
||||
if (exproto->state == NONE) {
|
||||
|
Loading…
Reference in New Issue
Block a user