diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 6e771a99e15..68d6499099e 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -304,7 +304,7 @@ ssize_t AsyncConnection::write(bufferlist &bl, bool more) { std::unique_lock l(write_lock); - outcoming_bl.claim_append(bl); + outgoing_bl.claim_append(bl); ssize_t r = _try_send(more); if (r > 0) { writeCallback = callback; @@ -324,16 +324,16 @@ ssize_t AsyncConnection::_try_send(bool more) } ceph_assert(center->in_thread()); - ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length() + ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length() << " bytes" << dendl; - ssize_t r = cs.send(outcoming_bl, more); + ssize_t r = cs.send(outgoing_bl, more); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; return r; } ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r - << " remaining bytes " << outcoming_bl.length() << dendl; + << " remaining bytes " << outgoing_bl.length() << dendl; if (!open_write && is_queued()) { center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); @@ -348,7 +348,7 @@ ssize_t AsyncConnection::_try_send(bool more) } } - return outcoming_bl.length(); + return outgoing_bl.length(); } void AsyncConnection::inject_delay() { @@ -600,7 +600,7 @@ void AsyncConnection::fault() recv_start = recv_end = 0; state_offset = 0; - outcoming_bl.clear(); + outgoing_bl.clear(); } void AsyncConnection::_stop() { @@ -618,7 +618,7 @@ void AsyncConnection::_stop() { } bool AsyncConnection::is_queued() const { - return outcoming_bl.length(); + return outgoing_bl.length(); } void AsyncConnection::shutdown_socket() { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3ce26e6d52a..9f6e004fe2b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -170,7 +170,7 @@ class AsyncConnection : public Connection { DispatchQueue *dispatch_queue; // lockfree, only used in own thread - bufferlist outcoming_bl; + bufferlist outgoing_bl; bool open_write = false; std::mutex write_lock; diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 9f6160d7ef0..2cdbfb2e9eb 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -361,8 +361,8 @@ void ProtocolV1::write_event() { if (left) { ceph_le64 s; s = in_seq; - connection->outcoming_bl.append(CEPH_MSGR_TAG_ACK); - connection->outcoming_bl.append((char *)&s, sizeof(s)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_ACK); + connection->outgoing_bl.append((char *)&s, sizeof(s)); ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left -= left; @@ -550,16 +550,16 @@ void ProtocolV1::append_keepalive_or_ack(bool ack, utime_t *tp) { ceph_assert(tp); struct ceph_timespec ts; tp->encode_timeval(&ts); - connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); - connection->outcoming_bl.append((char *)&ts, sizeof(ts)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); + connection->outgoing_bl.append((char *)&ts, sizeof(ts)); } else if (connection->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { struct ceph_timespec ts; utime_t t = ceph_clock_now(); t.encode_timeval(&ts); - connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2); - connection->outcoming_bl.append((char *)&ts, sizeof(ts)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE2); + connection->outgoing_bl.append((char *)&ts, sizeof(ts)); } else { - connection->outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE); + connection->outgoing_bl.append(CEPH_MSGR_TAG_KEEPALIVE); } } @@ -1075,9 +1075,9 @@ void ProtocolV1::session_reset() { connection->dispatch_queue->discard_queue(connection->conn_id); discard_out_queue(); - // note: we need to clear outcoming_bl here, but session_reset may be + // note: we need to clear outgoing_bl here, but session_reset may be // called by other thread, so let caller clear this itself! - // outcoming_bl.clear(); + // outgoing_bl.clear(); connection->dispatch_queue->queue_remote_reset(connection); @@ -1133,8 +1133,8 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { } } - connection->outcoming_bl.append(CEPH_MSGR_TAG_MSG); - connection->outcoming_bl.append((char *)&header, sizeof(header)); + connection->outgoing_bl.append(CEPH_MSGR_TAG_MSG); + connection->outgoing_bl.append((char *)&header, sizeof(header)); ldout(cct, 20) << __func__ << " sending message type=" << header.type << " src " << entity_name_t(header.src) @@ -1143,17 +1143,17 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) { for (const auto &pb : bl.buffers()) { - connection->outcoming_bl.append((char *)pb.c_str(), pb.length()); + connection->outgoing_bl.append((char *)pb.c_str(), pb.length()); } } else { - connection->outcoming_bl.claim_append(bl); + connection->outgoing_bl.claim_append(bl); } // send footer; if receiver doesn't support signatures, use the old footer // format ceph_msg_footer_old old_footer; if (connection->has_feature(CEPH_FEATURE_MSG_AUTH)) { - connection->outcoming_bl.append((char *)&footer, sizeof(footer)); + connection->outgoing_bl.append((char *)&footer, sizeof(footer)); } else { if (messenger->crcflags & MSG_CRC_HEADER) { old_footer.front_crc = footer.front_crc; @@ -1164,20 +1164,20 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { old_footer.data_crc = messenger->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; old_footer.flags = footer.flags; - connection->outcoming_bl.append((char *)&old_footer, sizeof(old_footer)); + connection->outgoing_bl.append((char *)&old_footer, sizeof(old_footer)); } m->trace.event("async writing message"); ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m << dendl; - ssize_t total_send_size = connection->outcoming_bl.length(); + ssize_t total_send_size = connection->outgoing_bl.length(); ssize_t rc = connection->_try_send(more); if (rc < 0) { ldout(cct, 1) << __func__ << " error sending " << m << ", " << cpp_strerror(rc) << dendl; } else { connection->logger->inc( - l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length()); + l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length()); ldout(cct, 10) << __func__ << " sending " << m << (rc ? " continuely." : " done.") << dendl; } @@ -1668,7 +1668,7 @@ CtPtr ProtocolV1::handle_connect_reply_2() { connect_seq = 0; // see session_reset - connection->outcoming_bl.clear(); + connection->outgoing_bl.clear(); return CONTINUE(send_connect_message); } @@ -2348,7 +2348,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing, std::lock_guard l(existing->lock); existing->write_lock.lock(); exproto->requeue_sent(); - existing->outcoming_bl.clear(); + existing->outgoing_bl.clear(); existing->open_write = false; existing->write_lock.unlock(); if (exproto->state == NONE) { diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index af02cc2d729..315a7ab2abb 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -145,7 +145,7 @@ void ProtocolV2::reset_session() { connection->dispatch_queue->discard_queue(connection->conn_id); discard_out_queue(); - connection->outcoming_bl.clear(); + connection->outgoing_bl.clear(); connection->dispatch_queue->queue_remote_reset(connection); @@ -508,7 +508,7 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { m->get_payload(), m->get_middle(), m->get_data()); - connection->outcoming_bl.append(message.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(message.get_buffer(session_stream_handlers)); ldout(cct, 5) << __func__ << " sending message m=" << m << " seq=" << m->get_seq() << " " << *m << dendl; @@ -518,14 +518,14 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { << " src=" << entity_name_t(messenger->get_myname()) << " off=" << header2.data_off << dendl; - ssize_t total_send_size = connection->outcoming_bl.length(); + ssize_t total_send_size = connection->outgoing_bl.length(); ssize_t rc = connection->_try_send(more); if (rc < 0) { ldout(cct, 1) << __func__ << " error sending " << m << ", " << cpp_strerror(rc) << dendl; } else { connection->logger->inc( - l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length()); + l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length()); ldout(cct, 10) << __func__ << " sending " << m << (rc ? " continuely." : " done.") << dendl; } @@ -544,12 +544,12 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { void ProtocolV2::append_keepalive() { ldout(cct, 10) << __func__ << dendl; auto keepalive_frame = KeepAliveFrame::Encode(); - connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers)); } void ProtocolV2::append_keepalive_ack(utime_t ×tamp) { auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp); - connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); } void ProtocolV2::handle_message_ack(uint64_t seq) { @@ -639,7 +639,7 @@ void ProtocolV2::write_event() { uint64_t left = ack_left; if (left) { auto ack = AckFrame::Encode(in_seq); - connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers)); + connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers)); ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left -= left; @@ -2715,7 +2715,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, std::lock_guard l(existing->lock); existing->write_lock.lock(); exproto->requeue_sent(); - existing->outcoming_bl.clear(); + existing->outgoing_bl.clear(); existing->open_write = false; existing->write_lock.unlock(); if (exproto->state == NONE) {