diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index d6e24370552..5418abaa0ea 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1679,7 +1679,6 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->delay_state->flush(); assert(!delay_state); } - existing->requeue_sent(); existing->reset_recv_state(); auto temp_cs = std::move(cs); @@ -1706,8 +1705,11 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // we need to delete time event in original thread { std::lock_guard l(existing->lock); + existing->write_lock.lock(); + existing->requeue_sent(); existing->outcoming_bl.clear(); existing->open_write = false; + existing->write_lock.unlock(); if (existing->state == STATE_NONE) { existing->shutdown_socket(); existing->cs = std::move(cs); @@ -2431,6 +2433,7 @@ void AsyncConnection::handle_write() } auto start = ceph::mono_clock::now(); + bool more; do { bufferlist data; Message *m = _get_next_outgoing(&data); @@ -2442,13 +2445,14 @@ void AsyncConnection::handle_write() sent.push_back(m); m->get(); } + more = _has_next_outgoing(); write_lock.unlock(); // send_message or requeue messages may not encode message if (!data.length()) prepare_send_message(get_features(), m, data); - r = write_message(m, data, _has_next_outgoing()); + r = write_message(m, data, more); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; goto fail;